From: Brian Warner Date: Wed, 18 Feb 2009 22:42:34 +0000 (-0700) Subject: test_system: split off checker tests to test_deepcheck.py, this file is too big X-Git-Tag: allmydata-tahoe-1.4.0~192 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/...?a=commitdiff_plain;h=b398a69f1468a82ca05c9ef6e31304438c6b95b5;p=tahoe-lafs%2Ftahoe-lafs.git test_system: split off checker tests to test_deepcheck.py, this file is too big --- diff --git a/src/allmydata/test/test_deepcheck.py b/src/allmydata/test/test_deepcheck.py new file mode 100644 index 00000000..f8d4cfc4 --- /dev/null +++ b/src/allmydata/test/test_deepcheck.py @@ -0,0 +1,1157 @@ + +import os, simplejson, urllib +from cStringIO import StringIO +from twisted.trial import unittest +from twisted.internet import defer +from twisted.internet import threads # CLI tests use deferToThread +from allmydata.immutable import upload +from allmydata.util import idlib +from allmydata.util import base32 +from allmydata.scripts import runner +from allmydata.interfaces import ICheckResults, ICheckAndRepairResults, \ + IDeepCheckResults, IDeepCheckAndRepairResults +from allmydata.monitor import Monitor, OperationCancelledError +from twisted.web.client import getPage + +from allmydata.test.common import SystemTestMixin, ErrorMixin + +class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin): + + def _run_cli(self, argv): + stdout, stderr = StringIO(), StringIO() + # this can only do synchronous operations + assert argv[0] == "debug" + runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr) + return stdout.getvalue() + + def test_good(self): + self.basedir = self.mktemp() + d = self.set_up_nodes() + CONTENTS = "a little bit of data" + d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS)) + def _created(node): + self.node = node + si = self.node.get_storage_index() + d.addCallback(_created) + # now make sure the webapi verifier sees no problems + def _do_check(res): + url = (self.webish_url + + "uri/%s" % urllib.quote(self.node.get_uri()) + + "?t=check&verify=true") + return getPage(url, method="POST") + d.addCallback(_do_check) + def _got_results(out): + self.failUnless("Healthy : Healthy" in out, out) + self.failUnless("Recoverable Versions: 10*seq1-" in out, out) + self.failIf("Not Healthy!" in out, out) + self.failIf("Unhealthy" in out, out) + self.failIf("Corrupt Shares" in out, out) + d.addCallback(_got_results) + d.addErrback(self.explain_web_error) + return d + + def test_corrupt(self): + self.basedir = self.mktemp() + d = self.set_up_nodes() + CONTENTS = "a little bit of data" + d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS)) + def _created(node): + self.node = node + si = self.node.get_storage_index() + out = self._run_cli(["debug", "find-shares", base32.b2a(si), + self.clients[1].basedir]) + files = out.split("\n") + # corrupt one of them, using the CLI debug command + f = files[0] + shnum = os.path.basename(f) + nodeid = self.clients[1].nodeid + nodeid_prefix = idlib.shortnodeid_b2a(nodeid) + self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum) + out = self._run_cli(["debug", "corrupt-share", files[0]]) + d.addCallback(_created) + # now make sure the webapi verifier notices it + def _do_check(res): + url = (self.webish_url + + "uri/%s" % urllib.quote(self.node.get_uri()) + + "?t=check&verify=true") + return getPage(url, method="POST") + d.addCallback(_do_check) + def _got_results(out): + self.failUnless("Not Healthy!" in out, out) + self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out) + self.failUnless("Corrupt Shares:" in out, out) + d.addCallback(_got_results) + + # now make sure the webapi repairer can fix it + def _do_repair(res): + url = (self.webish_url + + "uri/%s" % urllib.quote(self.node.get_uri()) + + "?t=check&verify=true&repair=true") + return getPage(url, method="POST") + d.addCallback(_do_repair) + def _got_repair_results(out): + self.failUnless("
Repair successful
" in out, out) + d.addCallback(_got_repair_results) + d.addCallback(_do_check) + def _got_postrepair_results(out): + self.failIf("Not Healthy!" in out, out) + self.failUnless("Recoverable Versions: 10*seq" in out, out) + d.addCallback(_got_postrepair_results) + d.addErrback(self.explain_web_error) + + return d + + def test_delete_share(self): + self.basedir = self.mktemp() + d = self.set_up_nodes() + CONTENTS = "a little bit of data" + d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS)) + def _created(node): + self.node = node + si = self.node.get_storage_index() + out = self._run_cli(["debug", "find-shares", base32.b2a(si), + self.clients[1].basedir]) + files = out.split("\n") + # corrupt one of them, using the CLI debug command + f = files[0] + shnum = os.path.basename(f) + nodeid = self.clients[1].nodeid + nodeid_prefix = idlib.shortnodeid_b2a(nodeid) + self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum) + os.unlink(files[0]) + d.addCallback(_created) + # now make sure the webapi checker notices it + def _do_check(res): + url = (self.webish_url + + "uri/%s" % urllib.quote(self.node.get_uri()) + + "?t=check&verify=false") + return getPage(url, method="POST") + d.addCallback(_do_check) + def _got_results(out): + self.failUnless("Not Healthy!" in out, out) + self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out) + self.failIf("Corrupt Shares" in out, out) + d.addCallback(_got_results) + + # now make sure the webapi repairer can fix it + def _do_repair(res): + url = (self.webish_url + + "uri/%s" % urllib.quote(self.node.get_uri()) + + "?t=check&verify=false&repair=true") + return getPage(url, method="POST") + d.addCallback(_do_repair) + def _got_repair_results(out): + self.failUnless("Repair successful" in out) + d.addCallback(_got_repair_results) + d.addCallback(_do_check) + def _got_postrepair_results(out): + self.failIf("Not Healthy!" in out, out) + self.failUnless("Recoverable Versions: 10*seq" in out) + d.addCallback(_got_postrepair_results) + d.addErrback(self.explain_web_error) + + return d + + +class DeepCheckBase(SystemTestMixin, ErrorMixin): + + def web_json(self, n, **kwargs): + kwargs["output"] = "json" + d = self.web(n, "POST", **kwargs) + d.addCallback(self.decode_json) + return d + + def decode_json(self, (s,url)): + try: + data = simplejson.loads(s) + except ValueError: + self.fail("%s: not JSON: '%s'" % (url, s)) + return data + + def parse_streamed_json(self, s): + for unit in s.split("\n"): + if not unit: + # stream should end with a newline, so split returns "" + continue + yield simplejson.loads(unit) + + def web(self, n, method="GET", **kwargs): + # returns (data, url) + url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri()) + + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()])) + d = getPage(url, method=method) + d.addCallback(lambda data: (data,url)) + return d + + def wait_for_operation(self, ignored, ophandle): + url = self.webish_url + "operations/" + ophandle + url += "?t=status&output=JSON" + d = getPage(url) + def _got(res): + try: + data = simplejson.loads(res) + except ValueError: + self.fail("%s: not JSON: '%s'" % (url, res)) + if not data["finished"]: + d = self.stall(delay=1.0) + d.addCallback(self.wait_for_operation, ophandle) + return d + return data + d.addCallback(_got) + return d + + def get_operation_results(self, ignored, ophandle, output=None): + url = self.webish_url + "operations/" + ophandle + url += "?t=status" + if output: + url += "&output=" + output + d = getPage(url) + def _got(res): + if output and output.lower() == "json": + try: + return simplejson.loads(res) + except ValueError: + self.fail("%s: not JSON: '%s'" % (url, res)) + return res + d.addCallback(_got) + return d + + def slow_web(self, n, output=None, **kwargs): + # use ophandle= + handle = base32.b2a(os.urandom(4)) + d = self.web(n, "POST", ophandle=handle, **kwargs) + d.addCallback(self.wait_for_operation, handle) + d.addCallback(self.get_operation_results, handle, output=output) + return d + + +class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): + # construct a small directory tree (with one dir, one immutable file, one + # mutable file, one LIT file, and a loop), and then check/examine it in + # various ways. + + def set_up_tree(self, ignored): + # 2.9s + + # root + # mutable + # large + # small + # small2 + # loop -> root + c0 = self.clients[0] + d = c0.create_empty_dirnode() + def _created_root(n): + self.root = n + self.root_uri = n.get_uri() + d.addCallback(_created_root) + d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents")) + d.addCallback(lambda n: self.root.set_node(u"mutable", n)) + def _created_mutable(n): + self.mutable = n + self.mutable_uri = n.get_uri() + d.addCallback(_created_mutable) + + large = upload.Data("Lots of data\n" * 1000, None) + d.addCallback(lambda ign: self.root.add_file(u"large", large)) + def _created_large(n): + self.large = n + self.large_uri = n.get_uri() + d.addCallback(_created_large) + + small = upload.Data("Small enough for a LIT", None) + d.addCallback(lambda ign: self.root.add_file(u"small", small)) + def _created_small(n): + self.small = n + self.small_uri = n.get_uri() + d.addCallback(_created_small) + + small2 = upload.Data("Small enough for a LIT too", None) + d.addCallback(lambda ign: self.root.add_file(u"small2", small2)) + def _created_small2(n): + self.small2 = n + self.small2_uri = n.get_uri() + d.addCallback(_created_small2) + + d.addCallback(lambda ign: self.root.set_node(u"loop", self.root)) + return d + + def check_is_healthy(self, cr, n, where, incomplete=False): + self.failUnless(ICheckResults.providedBy(cr), where) + self.failUnless(cr.is_healthy(), where) + self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(), + where) + self.failUnlessEqual(cr.get_storage_index_string(), + base32.b2a(n.get_storage_index()), where) + needs_rebalancing = bool( len(self.clients) < 10 ) + if not incomplete: + self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, str((where, cr, cr.get_data()))) + d = cr.get_data() + self.failUnlessEqual(d["count-shares-good"], 10, where) + self.failUnlessEqual(d["count-shares-needed"], 3, where) + self.failUnlessEqual(d["count-shares-expected"], 10, where) + if not incomplete: + self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where) + self.failUnlessEqual(d["count-corrupt-shares"], 0, where) + self.failUnlessEqual(d["list-corrupt-shares"], [], where) + if not incomplete: + self.failUnlessEqual(sorted(d["servers-responding"]), + sorted([c.nodeid for c in self.clients]), + where) + self.failUnless("sharemap" in d, str((where, d))) + all_serverids = set() + for (shareid, serverids) in d["sharemap"].items(): + all_serverids.update(serverids) + self.failUnlessEqual(sorted(all_serverids), + sorted([c.nodeid for c in self.clients]), + where) + + self.failUnlessEqual(d["count-wrong-shares"], 0, where) + self.failUnlessEqual(d["count-recoverable-versions"], 1, where) + self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where) + + + def check_and_repair_is_healthy(self, cr, n, where, incomplete=False): + self.failUnless(ICheckAndRepairResults.providedBy(cr), (where, cr)) + self.failUnless(cr.get_pre_repair_results().is_healthy(), where) + self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete) + self.failUnless(cr.get_post_repair_results().is_healthy(), where) + self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete) + self.failIf(cr.get_repair_attempted(), where) + + def deep_check_is_healthy(self, cr, num_healthy, where): + self.failUnless(IDeepCheckResults.providedBy(cr)) + self.failUnlessEqual(cr.get_counters()["count-objects-healthy"], + num_healthy, where) + + def deep_check_and_repair_is_healthy(self, cr, num_healthy, where): + self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where) + c = cr.get_counters() + self.failUnlessEqual(c["count-objects-healthy-pre-repair"], + num_healthy, where) + self.failUnlessEqual(c["count-objects-healthy-post-repair"], + num_healthy, where) + self.failUnlessEqual(c["count-repairs-attempted"], 0, where) + + def test_good(self): + self.basedir = self.mktemp() + d = self.set_up_nodes() + d.addCallback(self.set_up_tree) + d.addCallback(self.do_stats) + d.addCallback(self.do_web_stream_manifest) + d.addCallback(self.do_web_stream_check) + d.addCallback(self.do_test_check_good) + d.addCallback(self.do_test_web_good) + d.addCallback(self.do_test_cli_good) + d.addErrback(self.explain_web_error) + d.addErrback(self.explain_error) + return d + + def do_stats(self, ignored): + d = defer.succeed(None) + d.addCallback(lambda ign: self.root.start_deep_stats().when_done()) + d.addCallback(self.check_stats_good) + return d + + def check_stats_good(self, s): + self.failUnlessEqual(s["count-directories"], 1) + self.failUnlessEqual(s["count-files"], 4) + self.failUnlessEqual(s["count-immutable-files"], 1) + self.failUnlessEqual(s["count-literal-files"], 2) + self.failUnlessEqual(s["count-mutable-files"], 1) + # don't check directories: their size will vary + # s["largest-directory"] + # s["size-directories"] + self.failUnlessEqual(s["largest-directory-children"], 5) + self.failUnlessEqual(s["largest-immutable-file"], 13000) + # to re-use this function for both the local + # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we + # coerce the result into a list of tuples. dirnode.start_deep_stats() + # returns a list of tuples, but JSON only knows about lists., so + # t=start-deep-stats returns a list of lists. + histogram = [tuple(stuff) for stuff in s["size-files-histogram"]] + self.failUnlessEqual(histogram, [(11, 31, 2), + (10001, 31622, 1), + ]) + self.failUnlessEqual(s["size-immutable-files"], 13000) + self.failUnlessEqual(s["size-literal-files"], 48) + + def do_web_stream_manifest(self, ignored): + d = self.web(self.root, method="POST", t="stream-manifest") + d.addCallback(lambda (output,url): + self._check_streamed_manifest(output)) + return d + + def _check_streamed_manifest(self, output): + units = list(self.parse_streamed_json(output)) + files = [u for u in units if u["type"] in ("file", "directory")] + assert units[-1]["type"] == "stats" + stats = units[-1]["stats"] + self.failUnlessEqual(len(files), 5) + # [root,mutable,large] are distributed, [small,small2] are not + self.failUnlessEqual(len([f for f in files + if f["verifycap"] is not None]), 3) + self.failUnlessEqual(len([f for f in files + if f["verifycap"] is None]), 2) + self.failUnlessEqual(len([f for f in files + if f["repaircap"] is not None]), 3) + self.failUnlessEqual(len([f for f in files + if f["repaircap"] is None]), 2) + self.failUnlessEqual(len([f for f in files + if f["storage-index"] is not None]), 3) + self.failUnlessEqual(len([f for f in files + if f["storage-index"] is None]), 2) + # make sure that a mutable file has filecap==repaircap!=verifycap + mutable = [f for f in files + if f["cap"] is not None + and f["cap"].startswith("URI:SSK:")][0] + self.failUnlessEqual(mutable["cap"], self.mutable_uri) + self.failIfEqual(mutable["cap"], mutable["verifycap"]) + self.failUnlessEqual(mutable["cap"], mutable["repaircap"]) + # for immutable file, verifycap==repaircap!=filecap + large = [f for f in files + if f["cap"] is not None + and f["cap"].startswith("URI:CHK:")][0] + self.failUnlessEqual(large["cap"], self.large_uri) + self.failIfEqual(large["cap"], large["verifycap"]) + self.failUnlessEqual(large["verifycap"], large["repaircap"]) + self.check_stats_good(stats) + + def do_web_stream_check(self, ignored): + return + d = self.web(self.root, t="stream-deep-check") + def _check(res): + units = list(self.parse_streamed_json(res)) + files = [u for u in units if u["type"] in ("file", "directory")] + assert units[-1]["type"] == "stats" + stats = units[-1]["stats"] + # ... + d.addCallback(_check) + return d + + def do_test_check_good(self, ignored): + d = defer.succeed(None) + # check the individual items + d.addCallback(lambda ign: self.root.check(Monitor())) + d.addCallback(self.check_is_healthy, self.root, "root") + d.addCallback(lambda ign: self.mutable.check(Monitor())) + d.addCallback(self.check_is_healthy, self.mutable, "mutable") + d.addCallback(lambda ign: self.large.check(Monitor())) + d.addCallback(self.check_is_healthy, self.large, "large") + d.addCallback(lambda ign: self.small.check(Monitor())) + d.addCallback(self.failUnlessEqual, None, "small") + d.addCallback(lambda ign: self.small2.check(Monitor())) + d.addCallback(self.failUnlessEqual, None, "small2") + + # and again with verify=True + d.addCallback(lambda ign: self.root.check(Monitor(), verify=True)) + d.addCallback(self.check_is_healthy, self.root, "root") + d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True)) + d.addCallback(self.check_is_healthy, self.mutable, "mutable") + d.addCallback(lambda ign: self.large.check(Monitor(), verify=True)) + d.addCallback(self.check_is_healthy, self.large, "large", incomplete=True) + d.addCallback(lambda ign: self.small.check(Monitor(), verify=True)) + d.addCallback(self.failUnlessEqual, None, "small") + d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True)) + d.addCallback(self.failUnlessEqual, None, "small2") + + # and check_and_repair(), which should be a nop + d.addCallback(lambda ign: self.root.check_and_repair(Monitor())) + d.addCallback(self.check_and_repair_is_healthy, self.root, "root") + d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor())) + d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable") + #TODO d.addCallback(lambda ign: self.large.check_and_repair(Monitor())) + #TODO d.addCallback(self.check_and_repair_is_healthy, self.large, "large") + #TODO d.addCallback(lambda ign: self.small.check_and_repair(Monitor())) + #TODO d.addCallback(self.failUnlessEqual, None, "small") + #TODO d.addCallback(lambda ign: self.small2.check_and_repair(Monitor())) + #TODO d.addCallback(self.failUnlessEqual, None, "small2") + + # check_and_repair(verify=True) + d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True)) + d.addCallback(self.check_and_repair_is_healthy, self.root, "root") + d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True)) + d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable") + #TODO d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True)) + #TODO d.addCallback(self.check_and_repair_is_healthy, self.large, "large", + #TODO incomplete=True) + #TODO d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True)) + #TODO d.addCallback(self.failUnlessEqual, None, "small") + #TODO d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True)) + #TODO d.addCallback(self.failUnlessEqual, None, "small2") + + + # now deep-check the root, with various verify= and repair= options + d.addCallback(lambda ign: + self.root.start_deep_check().when_done()) + d.addCallback(self.deep_check_is_healthy, 3, "root") + d.addCallback(lambda ign: + self.root.start_deep_check(verify=True).when_done()) + d.addCallback(self.deep_check_is_healthy, 3, "root") + d.addCallback(lambda ign: + self.root.start_deep_check_and_repair().when_done()) + d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root") + d.addCallback(lambda ign: + self.root.start_deep_check_and_repair(verify=True).when_done()) + d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root") + + # and finally, start a deep-check, but then cancel it. + d.addCallback(lambda ign: self.root.start_deep_check()) + def _checking(monitor): + monitor.cancel() + d = monitor.when_done() + # this should fire as soon as the next dirnode.list finishes. + # TODO: add a counter to measure how many list() calls are made, + # assert that no more than one gets to run before the cancel() + # takes effect. + def _finished_normally(res): + self.fail("this was supposed to fail, not finish normally") + def _cancelled(f): + f.trap(OperationCancelledError) + d.addCallbacks(_finished_normally, _cancelled) + return d + d.addCallback(_checking) + + return d + + def json_check_is_healthy(self, data, n, where, incomplete=False): + + self.failUnlessEqual(data["storage-index"], + base32.b2a(n.get_storage_index()), where) + self.failUnless("summary" in data, (where, data)) + self.failUnlessEqual(data["summary"].lower(), "healthy", + "%s: '%s'" % (where, data["summary"])) + r = data["results"] + self.failUnlessEqual(r["healthy"], True, where) + needs_rebalancing = bool( len(self.clients) < 10 ) + if not incomplete: + self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where) + self.failUnlessEqual(r["count-shares-good"], 10, where) + self.failUnlessEqual(r["count-shares-needed"], 3, where) + self.failUnlessEqual(r["count-shares-expected"], 10, where) + if not incomplete: + self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where) + self.failUnlessEqual(r["count-corrupt-shares"], 0, where) + self.failUnlessEqual(r["list-corrupt-shares"], [], where) + if not incomplete: + self.failUnlessEqual(sorted(r["servers-responding"]), + sorted([idlib.nodeid_b2a(c.nodeid) + for c in self.clients]), where) + self.failUnless("sharemap" in r, where) + all_serverids = set() + for (shareid, serverids_s) in r["sharemap"].items(): + all_serverids.update(serverids_s) + self.failUnlessEqual(sorted(all_serverids), + sorted([idlib.nodeid_b2a(c.nodeid) + for c in self.clients]), where) + self.failUnlessEqual(r["count-wrong-shares"], 0, where) + self.failUnlessEqual(r["count-recoverable-versions"], 1, where) + self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where) + + def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False): + self.failUnlessEqual(data["storage-index"], + base32.b2a(n.get_storage_index()), where) + self.failUnlessEqual(data["repair-attempted"], False, where) + self.json_check_is_healthy(data["pre-repair-results"], + n, where, incomplete) + self.json_check_is_healthy(data["post-repair-results"], + n, where, incomplete) + + def json_full_deepcheck_is_healthy(self, data, n, where): + self.failUnlessEqual(data["root-storage-index"], + base32.b2a(n.get_storage_index()), where) + self.failUnlessEqual(data["count-objects-checked"], 3, where) + self.failUnlessEqual(data["count-objects-healthy"], 3, where) + self.failUnlessEqual(data["count-objects-unhealthy"], 0, where) + self.failUnlessEqual(data["count-corrupt-shares"], 0, where) + self.failUnlessEqual(data["list-corrupt-shares"], [], where) + self.failUnlessEqual(data["list-unhealthy-files"], [], where) + self.json_check_stats_good(data["stats"], where) + + def json_full_deepcheck_and_repair_is_healthy(self, data, n, where): + self.failUnlessEqual(data["root-storage-index"], + base32.b2a(n.get_storage_index()), where) + self.failUnlessEqual(data["count-objects-checked"], 3, where) + + self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where) + self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where) + self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where) + + self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where) + self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where) + self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where) + + self.failUnlessEqual(data["list-corrupt-shares"], [], where) + self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where) + self.failUnlessEqual(data["list-unhealthy-files"], [], where) + + self.failUnlessEqual(data["count-repairs-attempted"], 0, where) + self.failUnlessEqual(data["count-repairs-successful"], 0, where) + self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where) + + + def json_check_lit(self, data, n, where): + self.failUnlessEqual(data["storage-index"], "", where) + self.failUnlessEqual(data["results"]["healthy"], True, where) + + def json_check_stats_good(self, data, where): + self.check_stats_good(data) + + def do_test_web_good(self, ignored): + d = defer.succeed(None) + + # stats + d.addCallback(lambda ign: + self.slow_web(self.root, + t="start-deep-stats", output="json")) + d.addCallback(self.json_check_stats_good, "deep-stats") + + # check, no verify + d.addCallback(lambda ign: self.web_json(self.root, t="check")) + d.addCallback(self.json_check_is_healthy, self.root, "root") + d.addCallback(lambda ign: self.web_json(self.mutable, t="check")) + d.addCallback(self.json_check_is_healthy, self.mutable, "mutable") + d.addCallback(lambda ign: self.web_json(self.large, t="check")) + d.addCallback(self.json_check_is_healthy, self.large, "large") + d.addCallback(lambda ign: self.web_json(self.small, t="check")) + d.addCallback(self.json_check_lit, self.small, "small") + d.addCallback(lambda ign: self.web_json(self.small2, t="check")) + d.addCallback(self.json_check_lit, self.small2, "small2") + + # check and verify + d.addCallback(lambda ign: + self.web_json(self.root, t="check", verify="true")) + d.addCallback(self.json_check_is_healthy, self.root, "root+v") + d.addCallback(lambda ign: + self.web_json(self.mutable, t="check", verify="true")) + d.addCallback(self.json_check_is_healthy, self.mutable, "mutable+v") + d.addCallback(lambda ign: + self.web_json(self.large, t="check", verify="true")) + d.addCallback(self.json_check_is_healthy, self.large, "large+v", + incomplete=True) + d.addCallback(lambda ign: + self.web_json(self.small, t="check", verify="true")) + d.addCallback(self.json_check_lit, self.small, "small+v") + d.addCallback(lambda ign: + self.web_json(self.small2, t="check", verify="true")) + d.addCallback(self.json_check_lit, self.small2, "small2+v") + + # check and repair, no verify + d.addCallback(lambda ign: + self.web_json(self.root, t="check", repair="true")) + d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+r") + d.addCallback(lambda ign: + self.web_json(self.mutable, t="check", repair="true")) + d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+r") + d.addCallback(lambda ign: + self.web_json(self.large, t="check", repair="true")) + d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+r") + d.addCallback(lambda ign: + self.web_json(self.small, t="check", repair="true")) + d.addCallback(self.json_check_lit, self.small, "small+r") + d.addCallback(lambda ign: + self.web_json(self.small2, t="check", repair="true")) + d.addCallback(self.json_check_lit, self.small2, "small2+r") + + # check+verify+repair + d.addCallback(lambda ign: + self.web_json(self.root, t="check", repair="true", verify="true")) + d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+vr") + d.addCallback(lambda ign: + self.web_json(self.mutable, t="check", repair="true", verify="true")) + d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+vr") + d.addCallback(lambda ign: + self.web_json(self.large, t="check", repair="true", verify="true")) + d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+vr", incomplete=True) + d.addCallback(lambda ign: + self.web_json(self.small, t="check", repair="true", verify="true")) + d.addCallback(self.json_check_lit, self.small, "small+vr") + d.addCallback(lambda ign: + self.web_json(self.small2, t="check", repair="true", verify="true")) + d.addCallback(self.json_check_lit, self.small2, "small2+vr") + + # now run a deep-check, with various verify= and repair= flags + d.addCallback(lambda ign: + self.slow_web(self.root, t="start-deep-check", output="json")) + d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+d") + d.addCallback(lambda ign: + self.slow_web(self.root, t="start-deep-check", verify="true", + output="json")) + d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+dv") + d.addCallback(lambda ign: + self.slow_web(self.root, t="start-deep-check", repair="true", + output="json")) + d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dr") + d.addCallback(lambda ign: + self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json")) + d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dvr") + + # now look at t=info + d.addCallback(lambda ign: self.web(self.root, t="info")) + # TODO: examine the output + d.addCallback(lambda ign: self.web(self.mutable, t="info")) + d.addCallback(lambda ign: self.web(self.large, t="info")) + d.addCallback(lambda ign: self.web(self.small, t="info")) + d.addCallback(lambda ign: self.web(self.small2, t="info")) + + return d + + def _run_cli(self, argv, stdin=""): + #print "CLI:", argv + stdout, stderr = StringIO(), StringIO() + d = threads.deferToThread(runner.runner, argv, run_by_human=False, + stdin=StringIO(stdin), + stdout=stdout, stderr=stderr) + def _done(res): + return stdout.getvalue(), stderr.getvalue() + d.addCallback(_done) + return d + + def do_test_cli_good(self, ignored): + d = defer.succeed(None) + d.addCallback(lambda ign: self.do_cli_manifest_stream1()) + d.addCallback(lambda ign: self.do_cli_manifest_stream2()) + d.addCallback(lambda ign: self.do_cli_manifest_stream3()) + d.addCallback(lambda ign: self.do_cli_manifest_stream4()) + d.addCallback(lambda ign: self.do_cli_manifest_stream5()) + d.addCallback(lambda ign: self.do_cli_stats1()) + d.addCallback(lambda ign: self.do_cli_stats2()) + return d + + def _check_manifest_storage_index(self, out): + lines = [l for l in out.split("\n") if l] + self.failUnlessEqual(len(lines), 3) + self.failUnless(base32.b2a(self.root.get_storage_index()) in lines) + self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines) + self.failUnless(base32.b2a(self.large.get_storage_index()) in lines) + + def do_cli_manifest_stream1(self): + basedir = self.getdir("client0") + d = self._run_cli(["manifest", + "--node-directory", basedir, + self.root_uri]) + def _check((out,err)): + self.failUnlessEqual(err, "") + lines = [l for l in out.split("\n") if l] + self.failUnlessEqual(len(lines), 5) + caps = {} + for l in lines: + try: + cap, path = l.split(None, 1) + except ValueError: + cap = l.strip() + path = "" + caps[cap] = path + self.failUnless(self.root.get_uri() in caps) + self.failUnlessEqual(caps[self.root.get_uri()], "") + self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable") + self.failUnlessEqual(caps[self.large.get_uri()], "large") + self.failUnlessEqual(caps[self.small.get_uri()], "small") + self.failUnlessEqual(caps[self.small2.get_uri()], "small2") + d.addCallback(_check) + return d + + def do_cli_manifest_stream2(self): + basedir = self.getdir("client0") + d = self._run_cli(["manifest", + "--node-directory", basedir, + "--raw", + self.root_uri]) + def _check((out,err)): + self.failUnlessEqual(err, "") + # this should be the same as the POST t=stream-manifest output + self._check_streamed_manifest(out) + d.addCallback(_check) + return d + + def do_cli_manifest_stream3(self): + basedir = self.getdir("client0") + d = self._run_cli(["manifest", + "--node-directory", basedir, + "--storage-index", + self.root_uri]) + def _check((out,err)): + self.failUnlessEqual(err, "") + self._check_manifest_storage_index(out) + d.addCallback(_check) + return d + + def do_cli_manifest_stream4(self): + basedir = self.getdir("client0") + d = self._run_cli(["manifest", + "--node-directory", basedir, + "--verify-cap", + self.root_uri]) + def _check((out,err)): + self.failUnlessEqual(err, "") + lines = [l for l in out.split("\n") if l] + self.failUnlessEqual(len(lines), 3) + self.failUnless(self.root.get_verify_cap().to_string() in lines) + self.failUnless(self.mutable.get_verify_cap().to_string() in lines) + self.failUnless(self.large.get_verify_cap().to_string() in lines) + d.addCallback(_check) + return d + + def do_cli_manifest_stream5(self): + basedir = self.getdir("client0") + d = self._run_cli(["manifest", + "--node-directory", basedir, + "--repair-cap", + self.root_uri]) + def _check((out,err)): + self.failUnlessEqual(err, "") + lines = [l for l in out.split("\n") if l] + self.failUnlessEqual(len(lines), 3) + self.failUnless(self.root.get_repair_cap().to_string() in lines) + self.failUnless(self.mutable.get_repair_cap().to_string() in lines) + self.failUnless(self.large.get_repair_cap().to_string() in lines) + d.addCallback(_check) + return d + + def do_cli_stats1(self): + basedir = self.getdir("client0") + d = self._run_cli(["stats", + "--node-directory", basedir, + self.root_uri]) + def _check3((out,err)): + lines = [l.strip() for l in out.split("\n") if l] + self.failUnless("count-immutable-files: 1" in lines) + self.failUnless("count-mutable-files: 1" in lines) + self.failUnless("count-literal-files: 2" in lines) + self.failUnless("count-files: 4" in lines) + self.failUnless("count-directories: 1" in lines) + self.failUnless("size-immutable-files: 13000 (13.00 kB, 12.70 kiB)" in lines, lines) + self.failUnless("size-literal-files: 48" in lines) + self.failUnless(" 11-31 : 2 (31 B, 31 B)".strip() in lines) + self.failUnless("10001-31622 : 1 (31.62 kB, 30.88 kiB)".strip() in lines) + d.addCallback(_check3) + return d + + def do_cli_stats2(self): + basedir = self.getdir("client0") + d = self._run_cli(["stats", + "--node-directory", basedir, + "--raw", + self.root_uri]) + def _check4((out,err)): + data = simplejson.loads(out) + self.failUnlessEqual(data["count-immutable-files"], 1) + self.failUnlessEqual(data["count-immutable-files"], 1) + self.failUnlessEqual(data["count-mutable-files"], 1) + self.failUnlessEqual(data["count-literal-files"], 2) + self.failUnlessEqual(data["count-files"], 4) + self.failUnlessEqual(data["count-directories"], 1) + self.failUnlessEqual(data["size-immutable-files"], 13000) + self.failUnlessEqual(data["size-literal-files"], 48) + self.failUnless([11,31,2] in data["size-files-histogram"]) + self.failUnless([10001,31622,1] in data["size-files-histogram"]) + d.addCallback(_check4) + return d + + +class DeepCheckWebBad(DeepCheckBase, unittest.TestCase): + + def test_bad(self): + self.basedir = self.mktemp() + d = self.set_up_nodes() + d.addCallback(self.set_up_damaged_tree) + d.addCallback(self.do_check) + d.addCallback(self.do_deepcheck) + d.addCallback(self.do_test_web_bad) + d.addErrback(self.explain_web_error) + d.addErrback(self.explain_error) + return d + + + + def set_up_damaged_tree(self, ignored): + # 6.4s + + # root + # mutable-good + # mutable-missing-shares + # mutable-corrupt-shares + # mutable-unrecoverable + # large-good + # large-missing-shares + # large-corrupt-shares + # large-unrecoverable + + self.nodes = {} + + c0 = self.clients[0] + d = c0.create_empty_dirnode() + def _created_root(n): + self.root = n + self.root_uri = n.get_uri() + d.addCallback(_created_root) + d.addCallback(self.create_mangled, "mutable-good") + d.addCallback(self.create_mangled, "mutable-missing-shares") + d.addCallback(self.create_mangled, "mutable-corrupt-shares") + d.addCallback(self.create_mangled, "mutable-unrecoverable") + d.addCallback(self.create_mangled, "large-good") + d.addCallback(self.create_mangled, "large-missing-shares") + d.addCallback(self.create_mangled, "large-corrupt-shares") + d.addCallback(self.create_mangled, "large-unrecoverable") + + return d + + + def create_mangled(self, ignored, name): + nodetype, mangletype = name.split("-", 1) + if nodetype == "mutable": + d = self.clients[0].create_mutable_file("mutable file contents") + d.addCallback(lambda n: self.root.set_node(unicode(name), n)) + elif nodetype == "large": + large = upload.Data("Lots of data\n" * 1000 + name + "\n", None) + d = self.root.add_file(unicode(name), large) + elif nodetype == "small": + small = upload.Data("Small enough for a LIT", None) + d = self.root.add_file(unicode(name), small) + + def _stash_node(node): + self.nodes[name] = node + return node + d.addCallback(_stash_node) + + if mangletype == "good": + pass + elif mangletype == "missing-shares": + d.addCallback(self._delete_some_shares) + elif mangletype == "corrupt-shares": + d.addCallback(self._corrupt_some_shares) + else: + assert mangletype == "unrecoverable" + d.addCallback(self._delete_most_shares) + + return d + + def _run_cli(self, argv): + stdout, stderr = StringIO(), StringIO() + # this can only do synchronous operations + assert argv[0] == "debug" + runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr) + return stdout.getvalue() + + def _find_shares(self, node): + si = node.get_storage_index() + out = self._run_cli(["debug", "find-shares", base32.b2a(si)] + + [c.basedir for c in self.clients]) + files = out.split("\n") + return [f for f in files if f] + + def _delete_some_shares(self, node): + shares = self._find_shares(node) + os.unlink(shares[0]) + os.unlink(shares[1]) + + def _corrupt_some_shares(self, node): + shares = self._find_shares(node) + self._run_cli(["debug", "corrupt-share", shares[0]]) + self._run_cli(["debug", "corrupt-share", shares[1]]) + + def _delete_most_shares(self, node): + shares = self._find_shares(node) + for share in shares[1:]: + os.unlink(share) + + + def check_is_healthy(self, cr, where): + try: + self.failUnless(ICheckResults.providedBy(cr), (cr, type(cr), where)) + self.failUnless(cr.is_healthy(), (cr.get_report(), cr.is_healthy(), cr.get_summary(), where)) + self.failUnless(cr.is_recoverable(), where) + d = cr.get_data() + self.failUnlessEqual(d["count-recoverable-versions"], 1, where) + self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where) + return cr + except Exception, le: + le.args = tuple(le.args + (where,)) + raise + + def check_is_missing_shares(self, cr, where): + self.failUnless(ICheckResults.providedBy(cr), where) + self.failIf(cr.is_healthy(), where) + self.failUnless(cr.is_recoverable(), where) + d = cr.get_data() + self.failUnlessEqual(d["count-recoverable-versions"], 1, where) + self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where) + return cr + + def check_has_corrupt_shares(self, cr, where): + # by "corrupt-shares" we mean the file is still recoverable + self.failUnless(ICheckResults.providedBy(cr), where) + d = cr.get_data() + self.failIf(cr.is_healthy(), (where, cr)) + self.failUnless(cr.is_recoverable(), where) + d = cr.get_data() + self.failUnless(d["count-shares-good"] < 10, where) + self.failUnless(d["count-corrupt-shares"], where) + self.failUnless(d["list-corrupt-shares"], where) + return cr + + def check_is_unrecoverable(self, cr, where): + self.failUnless(ICheckResults.providedBy(cr), where) + d = cr.get_data() + self.failIf(cr.is_healthy(), where) + self.failIf(cr.is_recoverable(), where) + self.failUnless(d["count-shares-good"] < d["count-shares-needed"], (d["count-shares-good"], d["count-shares-needed"], where)) + self.failUnlessEqual(d["count-recoverable-versions"], 0, where) + self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where) + return cr + + def do_check(self, ignored): + d = defer.succeed(None) + + # check the individual items, without verification. This will not + # detect corrupt shares. + def _check(which, checker): + d = self.nodes[which].check(Monitor()) + d.addCallback(checker, which + "--check") + return d + + d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy)) + d.addCallback(lambda ign: _check("mutable-missing-shares", + self.check_is_missing_shares)) + d.addCallback(lambda ign: _check("mutable-corrupt-shares", + self.check_is_healthy)) + d.addCallback(lambda ign: _check("mutable-unrecoverable", + self.check_is_unrecoverable)) + d.addCallback(lambda ign: _check("large-good", self.check_is_healthy)) + d.addCallback(lambda ign: _check("large-missing-shares", + self.check_is_missing_shares)) + d.addCallback(lambda ign: _check("large-corrupt-shares", + self.check_is_healthy)) + d.addCallback(lambda ign: _check("large-unrecoverable", + self.check_is_unrecoverable)) + + # and again with verify=True, which *does* detect corrupt shares. + def _checkv(which, checker): + d = self.nodes[which].check(Monitor(), verify=True) + d.addCallback(checker, which + "--check-and-verify") + return d + + d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy)) + d.addCallback(lambda ign: _checkv("mutable-missing-shares", + self.check_is_missing_shares)) + d.addCallback(lambda ign: _checkv("mutable-corrupt-shares", + self.check_has_corrupt_shares)) + d.addCallback(lambda ign: _checkv("mutable-unrecoverable", + self.check_is_unrecoverable)) + d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy)) + d.addCallback(lambda ign: _checkv("large-missing-shares", self.check_is_missing_shares)) + d.addCallback(lambda ign: _checkv("large-corrupt-shares", self.check_has_corrupt_shares)) + d.addCallback(lambda ign: _checkv("large-unrecoverable", + self.check_is_unrecoverable)) + + return d + + def do_deepcheck(self, ignored): + d = defer.succeed(None) + + # now deep-check the root, with various verify= and repair= options + d.addCallback(lambda ign: + self.root.start_deep_check().when_done()) + def _check1(cr): + self.failUnless(IDeepCheckResults.providedBy(cr)) + c = cr.get_counters() + self.failUnlessEqual(c["count-objects-checked"], 9) + self.failUnlessEqual(c["count-objects-healthy"], 5) + self.failUnlessEqual(c["count-objects-unhealthy"], 4) + self.failUnlessEqual(c["count-objects-unrecoverable"], 2) + d.addCallback(_check1) + + d.addCallback(lambda ign: + self.root.start_deep_check(verify=True).when_done()) + def _check2(cr): + self.failUnless(IDeepCheckResults.providedBy(cr)) + c = cr.get_counters() + self.failUnlessEqual(c["count-objects-checked"], 9) + self.failUnlessEqual(c["count-objects-healthy"], 3) + self.failUnlessEqual(c["count-objects-unhealthy"], 6) + self.failUnlessEqual(c["count-objects-healthy"], 3) # root, mutable good, large good + self.failUnlessEqual(c["count-objects-unrecoverable"], 2) # mutable unrecoverable, large unrecoverable + d.addCallback(_check2) + + return d + + def json_is_healthy(self, data, where): + r = data["results"] + self.failUnless(r["healthy"], where) + self.failUnless(r["recoverable"], where) + self.failUnlessEqual(r["count-recoverable-versions"], 1, where) + self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where) + + def json_is_missing_shares(self, data, where): + r = data["results"] + self.failIf(r["healthy"], where) + self.failUnless(r["recoverable"], where) + self.failUnlessEqual(r["count-recoverable-versions"], 1, where) + self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where) + + def json_has_corrupt_shares(self, data, where): + # by "corrupt-shares" we mean the file is still recoverable + r = data["results"] + self.failIf(r["healthy"], where) + self.failUnless(r["recoverable"], where) + self.failUnless(r["count-shares-good"] < 10, where) + self.failUnless(r["count-corrupt-shares"], where) + self.failUnless(r["list-corrupt-shares"], where) + + def json_is_unrecoverable(self, data, where): + r = data["results"] + self.failIf(r["healthy"], where) + self.failIf(r["recoverable"], where) + self.failUnless(r["count-shares-good"] < r["count-shares-needed"], + where) + self.failUnlessEqual(r["count-recoverable-versions"], 0, where) + self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where) + + def do_test_web_bad(self, ignored): + d = defer.succeed(None) + + # check, no verify + def _check(which, checker): + d = self.web_json(self.nodes[which], t="check") + d.addCallback(checker, which + "--webcheck") + return d + + d.addCallback(lambda ign: _check("mutable-good", + self.json_is_healthy)) + d.addCallback(lambda ign: _check("mutable-missing-shares", + self.json_is_missing_shares)) + d.addCallback(lambda ign: _check("mutable-corrupt-shares", + self.json_is_healthy)) + d.addCallback(lambda ign: _check("mutable-unrecoverable", + self.json_is_unrecoverable)) + d.addCallback(lambda ign: _check("large-good", + self.json_is_healthy)) + d.addCallback(lambda ign: _check("large-missing-shares", + self.json_is_missing_shares)) + d.addCallback(lambda ign: _check("large-corrupt-shares", + self.json_is_healthy)) + d.addCallback(lambda ign: _check("large-unrecoverable", + self.json_is_unrecoverable)) + + # check and verify + def _checkv(which, checker): + d = self.web_json(self.nodes[which], t="check", verify="true") + d.addCallback(checker, which + "--webcheck-and-verify") + return d + + d.addCallback(lambda ign: _checkv("mutable-good", + self.json_is_healthy)) + d.addCallback(lambda ign: _checkv("mutable-missing-shares", + self.json_is_missing_shares)) + d.addCallback(lambda ign: _checkv("mutable-corrupt-shares", + self.json_has_corrupt_shares)) + d.addCallback(lambda ign: _checkv("mutable-unrecoverable", + self.json_is_unrecoverable)) + d.addCallback(lambda ign: _checkv("large-good", + self.json_is_healthy)) + d.addCallback(lambda ign: _checkv("large-missing-shares", self.json_is_missing_shares)) + d.addCallback(lambda ign: _checkv("large-corrupt-shares", self.json_has_corrupt_shares)) + d.addCallback(lambda ign: _checkv("large-unrecoverable", + self.json_is_unrecoverable)) + + return d diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 07a4ec55..5f7db1d6 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -1,5 +1,5 @@ from base64 import b32encode -import os, sys, time, re, simplejson, urllib +import os, sys, time, re, simplejson from cStringIO import StringIO from zope.interface import implements from twisted.trial import unittest @@ -16,9 +16,8 @@ from allmydata.util import idlib, mathutil from allmydata.util import log, base32 from allmydata.scripts import runner from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \ - ICheckResults, ICheckAndRepairResults, IDeepCheckResults, \ - IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError -from allmydata.monitor import Monitor, OperationCancelledError + NoSuchChildError, NotEnoughSharesError +from allmydata.monitor import Monitor from allmydata.mutable.common import NotMutableError from allmydata.mutable import layout as mutable_layout from foolscap import DeadReferenceError @@ -26,8 +25,8 @@ from twisted.python.failure import Failure from twisted.web.client import getPage from twisted.web.error import Error -from allmydata.test.common import SystemTestMixin, ErrorMixin, \ - MemoryConsumer, download_to_data +from allmydata.test.common import SystemTestMixin, MemoryConsumer, \ + download_to_data LARGE_DATA = """ This is some data to publish to the virtual drive, which needs to be large @@ -1785,1144 +1784,3 @@ class SystemTest(SystemTestMixin, unittest.TestCase): d.addCallback(_got_lit_filenode) return d - -class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin): - - def _run_cli(self, argv): - stdout, stderr = StringIO(), StringIO() - # this can only do synchronous operations - assert argv[0] == "debug" - runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr) - return stdout.getvalue() - - def test_good(self): - self.basedir = self.mktemp() - d = self.set_up_nodes() - CONTENTS = "a little bit of data" - d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS)) - def _created(node): - self.node = node - si = self.node.get_storage_index() - d.addCallback(_created) - # now make sure the webapi verifier sees no problems - def _do_check(res): - url = (self.webish_url + - "uri/%s" % urllib.quote(self.node.get_uri()) + - "?t=check&verify=true") - return getPage(url, method="POST") - d.addCallback(_do_check) - def _got_results(out): - self.failUnless("Healthy : Healthy" in out, out) - self.failUnless("Recoverable Versions: 10*seq1-" in out, out) - self.failIf("Not Healthy!" in out, out) - self.failIf("Unhealthy" in out, out) - self.failIf("Corrupt Shares" in out, out) - d.addCallback(_got_results) - d.addErrback(self.explain_web_error) - return d - - def test_corrupt(self): - self.basedir = self.mktemp() - d = self.set_up_nodes() - CONTENTS = "a little bit of data" - d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS)) - def _created(node): - self.node = node - si = self.node.get_storage_index() - out = self._run_cli(["debug", "find-shares", base32.b2a(si), - self.clients[1].basedir]) - files = out.split("\n") - # corrupt one of them, using the CLI debug command - f = files[0] - shnum = os.path.basename(f) - nodeid = self.clients[1].nodeid - nodeid_prefix = idlib.shortnodeid_b2a(nodeid) - self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum) - out = self._run_cli(["debug", "corrupt-share", files[0]]) - d.addCallback(_created) - # now make sure the webapi verifier notices it - def _do_check(res): - url = (self.webish_url + - "uri/%s" % urllib.quote(self.node.get_uri()) + - "?t=check&verify=true") - return getPage(url, method="POST") - d.addCallback(_do_check) - def _got_results(out): - self.failUnless("Not Healthy!" in out, out) - self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out) - self.failUnless("Corrupt Shares:" in out, out) - d.addCallback(_got_results) - - # now make sure the webapi repairer can fix it - def _do_repair(res): - url = (self.webish_url + - "uri/%s" % urllib.quote(self.node.get_uri()) + - "?t=check&verify=true&repair=true") - return getPage(url, method="POST") - d.addCallback(_do_repair) - def _got_repair_results(out): - self.failUnless("
Repair successful
" in out, out) - d.addCallback(_got_repair_results) - d.addCallback(_do_check) - def _got_postrepair_results(out): - self.failIf("Not Healthy!" in out, out) - self.failUnless("Recoverable Versions: 10*seq" in out, out) - d.addCallback(_got_postrepair_results) - d.addErrback(self.explain_web_error) - - return d - - def test_delete_share(self): - self.basedir = self.mktemp() - d = self.set_up_nodes() - CONTENTS = "a little bit of data" - d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS)) - def _created(node): - self.node = node - si = self.node.get_storage_index() - out = self._run_cli(["debug", "find-shares", base32.b2a(si), - self.clients[1].basedir]) - files = out.split("\n") - # corrupt one of them, using the CLI debug command - f = files[0] - shnum = os.path.basename(f) - nodeid = self.clients[1].nodeid - nodeid_prefix = idlib.shortnodeid_b2a(nodeid) - self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum) - os.unlink(files[0]) - d.addCallback(_created) - # now make sure the webapi checker notices it - def _do_check(res): - url = (self.webish_url + - "uri/%s" % urllib.quote(self.node.get_uri()) + - "?t=check&verify=false") - return getPage(url, method="POST") - d.addCallback(_do_check) - def _got_results(out): - self.failUnless("Not Healthy!" in out, out) - self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out) - self.failIf("Corrupt Shares" in out, out) - d.addCallback(_got_results) - - # now make sure the webapi repairer can fix it - def _do_repair(res): - url = (self.webish_url + - "uri/%s" % urllib.quote(self.node.get_uri()) + - "?t=check&verify=false&repair=true") - return getPage(url, method="POST") - d.addCallback(_do_repair) - def _got_repair_results(out): - self.failUnless("Repair successful" in out) - d.addCallback(_got_repair_results) - d.addCallback(_do_check) - def _got_postrepair_results(out): - self.failIf("Not Healthy!" in out, out) - self.failUnless("Recoverable Versions: 10*seq" in out) - d.addCallback(_got_postrepair_results) - d.addErrback(self.explain_web_error) - - return d - - -class DeepCheckBase(SystemTestMixin, ErrorMixin): - - def web_json(self, n, **kwargs): - kwargs["output"] = "json" - d = self.web(n, "POST", **kwargs) - d.addCallback(self.decode_json) - return d - - def decode_json(self, (s,url)): - try: - data = simplejson.loads(s) - except ValueError: - self.fail("%s: not JSON: '%s'" % (url, s)) - return data - - def parse_streamed_json(self, s): - for unit in s.split("\n"): - if not unit: - # stream should end with a newline, so split returns "" - continue - yield simplejson.loads(unit) - - def web(self, n, method="GET", **kwargs): - # returns (data, url) - url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri()) - + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()])) - d = getPage(url, method=method) - d.addCallback(lambda data: (data,url)) - return d - - def wait_for_operation(self, ignored, ophandle): - url = self.webish_url + "operations/" + ophandle - url += "?t=status&output=JSON" - d = getPage(url) - def _got(res): - try: - data = simplejson.loads(res) - except ValueError: - self.fail("%s: not JSON: '%s'" % (url, res)) - if not data["finished"]: - d = self.stall(delay=1.0) - d.addCallback(self.wait_for_operation, ophandle) - return d - return data - d.addCallback(_got) - return d - - def get_operation_results(self, ignored, ophandle, output=None): - url = self.webish_url + "operations/" + ophandle - url += "?t=status" - if output: - url += "&output=" + output - d = getPage(url) - def _got(res): - if output and output.lower() == "json": - try: - return simplejson.loads(res) - except ValueError: - self.fail("%s: not JSON: '%s'" % (url, res)) - return res - d.addCallback(_got) - return d - - def slow_web(self, n, output=None, **kwargs): - # use ophandle= - handle = base32.b2a(os.urandom(4)) - d = self.web(n, "POST", ophandle=handle, **kwargs) - d.addCallback(self.wait_for_operation, handle) - d.addCallback(self.get_operation_results, handle, output=output) - return d - - -class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): - # construct a small directory tree (with one dir, one immutable file, one - # mutable file, one LIT file, and a loop), and then check/examine it in - # various ways. - - def set_up_tree(self, ignored): - # 2.9s - - # root - # mutable - # large - # small - # small2 - # loop -> root - c0 = self.clients[0] - d = c0.create_empty_dirnode() - def _created_root(n): - self.root = n - self.root_uri = n.get_uri() - d.addCallback(_created_root) - d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents")) - d.addCallback(lambda n: self.root.set_node(u"mutable", n)) - def _created_mutable(n): - self.mutable = n - self.mutable_uri = n.get_uri() - d.addCallback(_created_mutable) - - large = upload.Data("Lots of data\n" * 1000, None) - d.addCallback(lambda ign: self.root.add_file(u"large", large)) - def _created_large(n): - self.large = n - self.large_uri = n.get_uri() - d.addCallback(_created_large) - - small = upload.Data("Small enough for a LIT", None) - d.addCallback(lambda ign: self.root.add_file(u"small", small)) - def _created_small(n): - self.small = n - self.small_uri = n.get_uri() - d.addCallback(_created_small) - - small2 = upload.Data("Small enough for a LIT too", None) - d.addCallback(lambda ign: self.root.add_file(u"small2", small2)) - def _created_small2(n): - self.small2 = n - self.small2_uri = n.get_uri() - d.addCallback(_created_small2) - - d.addCallback(lambda ign: self.root.set_node(u"loop", self.root)) - return d - - def check_is_healthy(self, cr, n, where, incomplete=False): - self.failUnless(ICheckResults.providedBy(cr), where) - self.failUnless(cr.is_healthy(), where) - self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(), - where) - self.failUnlessEqual(cr.get_storage_index_string(), - base32.b2a(n.get_storage_index()), where) - needs_rebalancing = bool( len(self.clients) < 10 ) - if not incomplete: - self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, str((where, cr, cr.get_data()))) - d = cr.get_data() - self.failUnlessEqual(d["count-shares-good"], 10, where) - self.failUnlessEqual(d["count-shares-needed"], 3, where) - self.failUnlessEqual(d["count-shares-expected"], 10, where) - if not incomplete: - self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where) - self.failUnlessEqual(d["count-corrupt-shares"], 0, where) - self.failUnlessEqual(d["list-corrupt-shares"], [], where) - if not incomplete: - self.failUnlessEqual(sorted(d["servers-responding"]), - sorted([c.nodeid for c in self.clients]), - where) - self.failUnless("sharemap" in d, str((where, d))) - all_serverids = set() - for (shareid, serverids) in d["sharemap"].items(): - all_serverids.update(serverids) - self.failUnlessEqual(sorted(all_serverids), - sorted([c.nodeid for c in self.clients]), - where) - - self.failUnlessEqual(d["count-wrong-shares"], 0, where) - self.failUnlessEqual(d["count-recoverable-versions"], 1, where) - self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where) - - - def check_and_repair_is_healthy(self, cr, n, where, incomplete=False): - self.failUnless(ICheckAndRepairResults.providedBy(cr), (where, cr)) - self.failUnless(cr.get_pre_repair_results().is_healthy(), where) - self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete) - self.failUnless(cr.get_post_repair_results().is_healthy(), where) - self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete) - self.failIf(cr.get_repair_attempted(), where) - - def deep_check_is_healthy(self, cr, num_healthy, where): - self.failUnless(IDeepCheckResults.providedBy(cr)) - self.failUnlessEqual(cr.get_counters()["count-objects-healthy"], - num_healthy, where) - - def deep_check_and_repair_is_healthy(self, cr, num_healthy, where): - self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where) - c = cr.get_counters() - self.failUnlessEqual(c["count-objects-healthy-pre-repair"], - num_healthy, where) - self.failUnlessEqual(c["count-objects-healthy-post-repair"], - num_healthy, where) - self.failUnlessEqual(c["count-repairs-attempted"], 0, where) - - def test_good(self): - self.basedir = self.mktemp() - d = self.set_up_nodes() - d.addCallback(self.set_up_tree) - d.addCallback(self.do_stats) - d.addCallback(self.do_web_stream_manifest) - d.addCallback(self.do_web_stream_check) - d.addCallback(self.do_test_check_good) - d.addCallback(self.do_test_web_good) - d.addCallback(self.do_test_cli_good) - d.addErrback(self.explain_web_error) - d.addErrback(self.explain_error) - return d - - def do_stats(self, ignored): - d = defer.succeed(None) - d.addCallback(lambda ign: self.root.start_deep_stats().when_done()) - d.addCallback(self.check_stats_good) - return d - - def check_stats_good(self, s): - self.failUnlessEqual(s["count-directories"], 1) - self.failUnlessEqual(s["count-files"], 4) - self.failUnlessEqual(s["count-immutable-files"], 1) - self.failUnlessEqual(s["count-literal-files"], 2) - self.failUnlessEqual(s["count-mutable-files"], 1) - # don't check directories: their size will vary - # s["largest-directory"] - # s["size-directories"] - self.failUnlessEqual(s["largest-directory-children"], 5) - self.failUnlessEqual(s["largest-immutable-file"], 13000) - # to re-use this function for both the local - # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we - # coerce the result into a list of tuples. dirnode.start_deep_stats() - # returns a list of tuples, but JSON only knows about lists., so - # t=start-deep-stats returns a list of lists. - histogram = [tuple(stuff) for stuff in s["size-files-histogram"]] - self.failUnlessEqual(histogram, [(11, 31, 2), - (10001, 31622, 1), - ]) - self.failUnlessEqual(s["size-immutable-files"], 13000) - self.failUnlessEqual(s["size-literal-files"], 48) - - def do_web_stream_manifest(self, ignored): - d = self.web(self.root, method="POST", t="stream-manifest") - d.addCallback(lambda (output,url): - self._check_streamed_manifest(output)) - return d - - def _check_streamed_manifest(self, output): - units = list(self.parse_streamed_json(output)) - files = [u for u in units if u["type"] in ("file", "directory")] - assert units[-1]["type"] == "stats" - stats = units[-1]["stats"] - self.failUnlessEqual(len(files), 5) - # [root,mutable,large] are distributed, [small,small2] are not - self.failUnlessEqual(len([f for f in files - if f["verifycap"] is not None]), 3) - self.failUnlessEqual(len([f for f in files - if f["verifycap"] is None]), 2) - self.failUnlessEqual(len([f for f in files - if f["repaircap"] is not None]), 3) - self.failUnlessEqual(len([f for f in files - if f["repaircap"] is None]), 2) - self.failUnlessEqual(len([f for f in files - if f["storage-index"] is not None]), 3) - self.failUnlessEqual(len([f for f in files - if f["storage-index"] is None]), 2) - # make sure that a mutable file has filecap==repaircap!=verifycap - mutable = [f for f in files - if f["cap"] is not None - and f["cap"].startswith("URI:SSK:")][0] - self.failUnlessEqual(mutable["cap"], self.mutable_uri) - self.failIfEqual(mutable["cap"], mutable["verifycap"]) - self.failUnlessEqual(mutable["cap"], mutable["repaircap"]) - # for immutable file, verifycap==repaircap!=filecap - large = [f for f in files - if f["cap"] is not None - and f["cap"].startswith("URI:CHK:")][0] - self.failUnlessEqual(large["cap"], self.large_uri) - self.failIfEqual(large["cap"], large["verifycap"]) - self.failUnlessEqual(large["verifycap"], large["repaircap"]) - self.check_stats_good(stats) - - def do_web_stream_check(self, ignored): - return - d = self.web(self.root, t="stream-deep-check") - def _check(res): - units = list(self.parse_streamed_json(res)) - files = [u for u in units if u["type"] in ("file", "directory")] - assert units[-1]["type"] == "stats" - stats = units[-1]["stats"] - # ... - d.addCallback(_check) - return d - - def do_test_check_good(self, ignored): - d = defer.succeed(None) - # check the individual items - d.addCallback(lambda ign: self.root.check(Monitor())) - d.addCallback(self.check_is_healthy, self.root, "root") - d.addCallback(lambda ign: self.mutable.check(Monitor())) - d.addCallback(self.check_is_healthy, self.mutable, "mutable") - d.addCallback(lambda ign: self.large.check(Monitor())) - d.addCallback(self.check_is_healthy, self.large, "large") - d.addCallback(lambda ign: self.small.check(Monitor())) - d.addCallback(self.failUnlessEqual, None, "small") - d.addCallback(lambda ign: self.small2.check(Monitor())) - d.addCallback(self.failUnlessEqual, None, "small2") - - # and again with verify=True - d.addCallback(lambda ign: self.root.check(Monitor(), verify=True)) - d.addCallback(self.check_is_healthy, self.root, "root") - d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True)) - d.addCallback(self.check_is_healthy, self.mutable, "mutable") - d.addCallback(lambda ign: self.large.check(Monitor(), verify=True)) - d.addCallback(self.check_is_healthy, self.large, "large", incomplete=True) - d.addCallback(lambda ign: self.small.check(Monitor(), verify=True)) - d.addCallback(self.failUnlessEqual, None, "small") - d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True)) - d.addCallback(self.failUnlessEqual, None, "small2") - - # and check_and_repair(), which should be a nop - d.addCallback(lambda ign: self.root.check_and_repair(Monitor())) - d.addCallback(self.check_and_repair_is_healthy, self.root, "root") - d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor())) - d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable") - #TODO d.addCallback(lambda ign: self.large.check_and_repair(Monitor())) - #TODO d.addCallback(self.check_and_repair_is_healthy, self.large, "large") - #TODO d.addCallback(lambda ign: self.small.check_and_repair(Monitor())) - #TODO d.addCallback(self.failUnlessEqual, None, "small") - #TODO d.addCallback(lambda ign: self.small2.check_and_repair(Monitor())) - #TODO d.addCallback(self.failUnlessEqual, None, "small2") - - # check_and_repair(verify=True) - d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True)) - d.addCallback(self.check_and_repair_is_healthy, self.root, "root") - d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True)) - d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable") - #TODO d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True)) - #TODO d.addCallback(self.check_and_repair_is_healthy, self.large, "large", - #TODO incomplete=True) - #TODO d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True)) - #TODO d.addCallback(self.failUnlessEqual, None, "small") - #TODO d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True)) - #TODO d.addCallback(self.failUnlessEqual, None, "small2") - - - # now deep-check the root, with various verify= and repair= options - d.addCallback(lambda ign: - self.root.start_deep_check().when_done()) - d.addCallback(self.deep_check_is_healthy, 3, "root") - d.addCallback(lambda ign: - self.root.start_deep_check(verify=True).when_done()) - d.addCallback(self.deep_check_is_healthy, 3, "root") - d.addCallback(lambda ign: - self.root.start_deep_check_and_repair().when_done()) - d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root") - d.addCallback(lambda ign: - self.root.start_deep_check_and_repair(verify=True).when_done()) - d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root") - - # and finally, start a deep-check, but then cancel it. - d.addCallback(lambda ign: self.root.start_deep_check()) - def _checking(monitor): - monitor.cancel() - d = monitor.when_done() - # this should fire as soon as the next dirnode.list finishes. - # TODO: add a counter to measure how many list() calls are made, - # assert that no more than one gets to run before the cancel() - # takes effect. - def _finished_normally(res): - self.fail("this was supposed to fail, not finish normally") - def _cancelled(f): - f.trap(OperationCancelledError) - d.addCallbacks(_finished_normally, _cancelled) - return d - d.addCallback(_checking) - - return d - - def json_check_is_healthy(self, data, n, where, incomplete=False): - - self.failUnlessEqual(data["storage-index"], - base32.b2a(n.get_storage_index()), where) - self.failUnless("summary" in data, (where, data)) - self.failUnlessEqual(data["summary"].lower(), "healthy", - "%s: '%s'" % (where, data["summary"])) - r = data["results"] - self.failUnlessEqual(r["healthy"], True, where) - needs_rebalancing = bool( len(self.clients) < 10 ) - if not incomplete: - self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where) - self.failUnlessEqual(r["count-shares-good"], 10, where) - self.failUnlessEqual(r["count-shares-needed"], 3, where) - self.failUnlessEqual(r["count-shares-expected"], 10, where) - if not incomplete: - self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where) - self.failUnlessEqual(r["count-corrupt-shares"], 0, where) - self.failUnlessEqual(r["list-corrupt-shares"], [], where) - if not incomplete: - self.failUnlessEqual(sorted(r["servers-responding"]), - sorted([idlib.nodeid_b2a(c.nodeid) - for c in self.clients]), where) - self.failUnless("sharemap" in r, where) - all_serverids = set() - for (shareid, serverids_s) in r["sharemap"].items(): - all_serverids.update(serverids_s) - self.failUnlessEqual(sorted(all_serverids), - sorted([idlib.nodeid_b2a(c.nodeid) - for c in self.clients]), where) - self.failUnlessEqual(r["count-wrong-shares"], 0, where) - self.failUnlessEqual(r["count-recoverable-versions"], 1, where) - self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where) - - def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False): - self.failUnlessEqual(data["storage-index"], - base32.b2a(n.get_storage_index()), where) - self.failUnlessEqual(data["repair-attempted"], False, where) - self.json_check_is_healthy(data["pre-repair-results"], - n, where, incomplete) - self.json_check_is_healthy(data["post-repair-results"], - n, where, incomplete) - - def json_full_deepcheck_is_healthy(self, data, n, where): - self.failUnlessEqual(data["root-storage-index"], - base32.b2a(n.get_storage_index()), where) - self.failUnlessEqual(data["count-objects-checked"], 3, where) - self.failUnlessEqual(data["count-objects-healthy"], 3, where) - self.failUnlessEqual(data["count-objects-unhealthy"], 0, where) - self.failUnlessEqual(data["count-corrupt-shares"], 0, where) - self.failUnlessEqual(data["list-corrupt-shares"], [], where) - self.failUnlessEqual(data["list-unhealthy-files"], [], where) - self.json_check_stats_good(data["stats"], where) - - def json_full_deepcheck_and_repair_is_healthy(self, data, n, where): - self.failUnlessEqual(data["root-storage-index"], - base32.b2a(n.get_storage_index()), where) - self.failUnlessEqual(data["count-objects-checked"], 3, where) - - self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where) - self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where) - self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where) - - self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where) - self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where) - self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where) - - self.failUnlessEqual(data["list-corrupt-shares"], [], where) - self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where) - self.failUnlessEqual(data["list-unhealthy-files"], [], where) - - self.failUnlessEqual(data["count-repairs-attempted"], 0, where) - self.failUnlessEqual(data["count-repairs-successful"], 0, where) - self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where) - - - def json_check_lit(self, data, n, where): - self.failUnlessEqual(data["storage-index"], "", where) - self.failUnlessEqual(data["results"]["healthy"], True, where) - - def json_check_stats_good(self, data, where): - self.check_stats_good(data) - - def do_test_web_good(self, ignored): - d = defer.succeed(None) - - # stats - d.addCallback(lambda ign: - self.slow_web(self.root, - t="start-deep-stats", output="json")) - d.addCallback(self.json_check_stats_good, "deep-stats") - - # check, no verify - d.addCallback(lambda ign: self.web_json(self.root, t="check")) - d.addCallback(self.json_check_is_healthy, self.root, "root") - d.addCallback(lambda ign: self.web_json(self.mutable, t="check")) - d.addCallback(self.json_check_is_healthy, self.mutable, "mutable") - d.addCallback(lambda ign: self.web_json(self.large, t="check")) - d.addCallback(self.json_check_is_healthy, self.large, "large") - d.addCallback(lambda ign: self.web_json(self.small, t="check")) - d.addCallback(self.json_check_lit, self.small, "small") - d.addCallback(lambda ign: self.web_json(self.small2, t="check")) - d.addCallback(self.json_check_lit, self.small2, "small2") - - # check and verify - d.addCallback(lambda ign: - self.web_json(self.root, t="check", verify="true")) - d.addCallback(self.json_check_is_healthy, self.root, "root+v") - d.addCallback(lambda ign: - self.web_json(self.mutable, t="check", verify="true")) - d.addCallback(self.json_check_is_healthy, self.mutable, "mutable+v") - d.addCallback(lambda ign: - self.web_json(self.large, t="check", verify="true")) - d.addCallback(self.json_check_is_healthy, self.large, "large+v", - incomplete=True) - d.addCallback(lambda ign: - self.web_json(self.small, t="check", verify="true")) - d.addCallback(self.json_check_lit, self.small, "small+v") - d.addCallback(lambda ign: - self.web_json(self.small2, t="check", verify="true")) - d.addCallback(self.json_check_lit, self.small2, "small2+v") - - # check and repair, no verify - d.addCallback(lambda ign: - self.web_json(self.root, t="check", repair="true")) - d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+r") - d.addCallback(lambda ign: - self.web_json(self.mutable, t="check", repair="true")) - d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+r") - d.addCallback(lambda ign: - self.web_json(self.large, t="check", repair="true")) - d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+r") - d.addCallback(lambda ign: - self.web_json(self.small, t="check", repair="true")) - d.addCallback(self.json_check_lit, self.small, "small+r") - d.addCallback(lambda ign: - self.web_json(self.small2, t="check", repair="true")) - d.addCallback(self.json_check_lit, self.small2, "small2+r") - - # check+verify+repair - d.addCallback(lambda ign: - self.web_json(self.root, t="check", repair="true", verify="true")) - d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+vr") - d.addCallback(lambda ign: - self.web_json(self.mutable, t="check", repair="true", verify="true")) - d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+vr") - d.addCallback(lambda ign: - self.web_json(self.large, t="check", repair="true", verify="true")) - d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+vr", incomplete=True) - d.addCallback(lambda ign: - self.web_json(self.small, t="check", repair="true", verify="true")) - d.addCallback(self.json_check_lit, self.small, "small+vr") - d.addCallback(lambda ign: - self.web_json(self.small2, t="check", repair="true", verify="true")) - d.addCallback(self.json_check_lit, self.small2, "small2+vr") - - # now run a deep-check, with various verify= and repair= flags - d.addCallback(lambda ign: - self.slow_web(self.root, t="start-deep-check", output="json")) - d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+d") - d.addCallback(lambda ign: - self.slow_web(self.root, t="start-deep-check", verify="true", - output="json")) - d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+dv") - d.addCallback(lambda ign: - self.slow_web(self.root, t="start-deep-check", repair="true", - output="json")) - d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dr") - d.addCallback(lambda ign: - self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json")) - d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dvr") - - # now look at t=info - d.addCallback(lambda ign: self.web(self.root, t="info")) - # TODO: examine the output - d.addCallback(lambda ign: self.web(self.mutable, t="info")) - d.addCallback(lambda ign: self.web(self.large, t="info")) - d.addCallback(lambda ign: self.web(self.small, t="info")) - d.addCallback(lambda ign: self.web(self.small2, t="info")) - - return d - - def _run_cli(self, argv, stdin=""): - #print "CLI:", argv - stdout, stderr = StringIO(), StringIO() - d = threads.deferToThread(runner.runner, argv, run_by_human=False, - stdin=StringIO(stdin), - stdout=stdout, stderr=stderr) - def _done(res): - return stdout.getvalue(), stderr.getvalue() - d.addCallback(_done) - return d - - def do_test_cli_good(self, ignored): - d = defer.succeed(None) - d.addCallback(lambda ign: self.do_cli_manifest_stream1()) - d.addCallback(lambda ign: self.do_cli_manifest_stream2()) - d.addCallback(lambda ign: self.do_cli_manifest_stream3()) - d.addCallback(lambda ign: self.do_cli_manifest_stream4()) - d.addCallback(lambda ign: self.do_cli_manifest_stream5()) - d.addCallback(lambda ign: self.do_cli_stats1()) - d.addCallback(lambda ign: self.do_cli_stats2()) - return d - - def _check_manifest_storage_index(self, out): - lines = [l for l in out.split("\n") if l] - self.failUnlessEqual(len(lines), 3) - self.failUnless(base32.b2a(self.root.get_storage_index()) in lines) - self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines) - self.failUnless(base32.b2a(self.large.get_storage_index()) in lines) - - def do_cli_manifest_stream1(self): - basedir = self.getdir("client0") - d = self._run_cli(["manifest", - "--node-directory", basedir, - self.root_uri]) - def _check((out,err)): - self.failUnlessEqual(err, "") - lines = [l for l in out.split("\n") if l] - self.failUnlessEqual(len(lines), 5) - caps = {} - for l in lines: - try: - cap, path = l.split(None, 1) - except ValueError: - cap = l.strip() - path = "" - caps[cap] = path - self.failUnless(self.root.get_uri() in caps) - self.failUnlessEqual(caps[self.root.get_uri()], "") - self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable") - self.failUnlessEqual(caps[self.large.get_uri()], "large") - self.failUnlessEqual(caps[self.small.get_uri()], "small") - self.failUnlessEqual(caps[self.small2.get_uri()], "small2") - d.addCallback(_check) - return d - - def do_cli_manifest_stream2(self): - basedir = self.getdir("client0") - d = self._run_cli(["manifest", - "--node-directory", basedir, - "--raw", - self.root_uri]) - def _check((out,err)): - self.failUnlessEqual(err, "") - # this should be the same as the POST t=stream-manifest output - self._check_streamed_manifest(out) - d.addCallback(_check) - return d - - def do_cli_manifest_stream3(self): - basedir = self.getdir("client0") - d = self._run_cli(["manifest", - "--node-directory", basedir, - "--storage-index", - self.root_uri]) - def _check((out,err)): - self.failUnlessEqual(err, "") - self._check_manifest_storage_index(out) - d.addCallback(_check) - return d - - def do_cli_manifest_stream4(self): - basedir = self.getdir("client0") - d = self._run_cli(["manifest", - "--node-directory", basedir, - "--verify-cap", - self.root_uri]) - def _check((out,err)): - self.failUnlessEqual(err, "") - lines = [l for l in out.split("\n") if l] - self.failUnlessEqual(len(lines), 3) - self.failUnless(self.root.get_verify_cap().to_string() in lines) - self.failUnless(self.mutable.get_verify_cap().to_string() in lines) - self.failUnless(self.large.get_verify_cap().to_string() in lines) - d.addCallback(_check) - return d - - def do_cli_manifest_stream5(self): - basedir = self.getdir("client0") - d = self._run_cli(["manifest", - "--node-directory", basedir, - "--repair-cap", - self.root_uri]) - def _check((out,err)): - self.failUnlessEqual(err, "") - lines = [l for l in out.split("\n") if l] - self.failUnlessEqual(len(lines), 3) - self.failUnless(self.root.get_repair_cap().to_string() in lines) - self.failUnless(self.mutable.get_repair_cap().to_string() in lines) - self.failUnless(self.large.get_repair_cap().to_string() in lines) - d.addCallback(_check) - return d - - def do_cli_stats1(self): - basedir = self.getdir("client0") - d = self._run_cli(["stats", - "--node-directory", basedir, - self.root_uri]) - def _check3((out,err)): - lines = [l.strip() for l in out.split("\n") if l] - self.failUnless("count-immutable-files: 1" in lines) - self.failUnless("count-mutable-files: 1" in lines) - self.failUnless("count-literal-files: 2" in lines) - self.failUnless("count-files: 4" in lines) - self.failUnless("count-directories: 1" in lines) - self.failUnless("size-immutable-files: 13000 (13.00 kB, 12.70 kiB)" in lines, lines) - self.failUnless("size-literal-files: 48" in lines) - self.failUnless(" 11-31 : 2 (31 B, 31 B)".strip() in lines) - self.failUnless("10001-31622 : 1 (31.62 kB, 30.88 kiB)".strip() in lines) - d.addCallback(_check3) - return d - - def do_cli_stats2(self): - basedir = self.getdir("client0") - d = self._run_cli(["stats", - "--node-directory", basedir, - "--raw", - self.root_uri]) - def _check4((out,err)): - data = simplejson.loads(out) - self.failUnlessEqual(data["count-immutable-files"], 1) - self.failUnlessEqual(data["count-immutable-files"], 1) - self.failUnlessEqual(data["count-mutable-files"], 1) - self.failUnlessEqual(data["count-literal-files"], 2) - self.failUnlessEqual(data["count-files"], 4) - self.failUnlessEqual(data["count-directories"], 1) - self.failUnlessEqual(data["size-immutable-files"], 13000) - self.failUnlessEqual(data["size-literal-files"], 48) - self.failUnless([11,31,2] in data["size-files-histogram"]) - self.failUnless([10001,31622,1] in data["size-files-histogram"]) - d.addCallback(_check4) - return d - - -class DeepCheckWebBad(DeepCheckBase, unittest.TestCase): - - def test_bad(self): - self.basedir = self.mktemp() - d = self.set_up_nodes() - d.addCallback(self.set_up_damaged_tree) - d.addCallback(self.do_check) - d.addCallback(self.do_deepcheck) - d.addCallback(self.do_test_web_bad) - d.addErrback(self.explain_web_error) - d.addErrback(self.explain_error) - return d - - - - def set_up_damaged_tree(self, ignored): - # 6.4s - - # root - # mutable-good - # mutable-missing-shares - # mutable-corrupt-shares - # mutable-unrecoverable - # large-good - # large-missing-shares - # large-corrupt-shares - # large-unrecoverable - - self.nodes = {} - - c0 = self.clients[0] - d = c0.create_empty_dirnode() - def _created_root(n): - self.root = n - self.root_uri = n.get_uri() - d.addCallback(_created_root) - d.addCallback(self.create_mangled, "mutable-good") - d.addCallback(self.create_mangled, "mutable-missing-shares") - d.addCallback(self.create_mangled, "mutable-corrupt-shares") - d.addCallback(self.create_mangled, "mutable-unrecoverable") - d.addCallback(self.create_mangled, "large-good") - d.addCallback(self.create_mangled, "large-missing-shares") - d.addCallback(self.create_mangled, "large-corrupt-shares") - d.addCallback(self.create_mangled, "large-unrecoverable") - - return d - - - def create_mangled(self, ignored, name): - nodetype, mangletype = name.split("-", 1) - if nodetype == "mutable": - d = self.clients[0].create_mutable_file("mutable file contents") - d.addCallback(lambda n: self.root.set_node(unicode(name), n)) - elif nodetype == "large": - large = upload.Data("Lots of data\n" * 1000 + name + "\n", None) - d = self.root.add_file(unicode(name), large) - elif nodetype == "small": - small = upload.Data("Small enough for a LIT", None) - d = self.root.add_file(unicode(name), small) - - def _stash_node(node): - self.nodes[name] = node - return node - d.addCallback(_stash_node) - - if mangletype == "good": - pass - elif mangletype == "missing-shares": - d.addCallback(self._delete_some_shares) - elif mangletype == "corrupt-shares": - d.addCallback(self._corrupt_some_shares) - else: - assert mangletype == "unrecoverable" - d.addCallback(self._delete_most_shares) - - return d - - def _run_cli(self, argv): - stdout, stderr = StringIO(), StringIO() - # this can only do synchronous operations - assert argv[0] == "debug" - runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr) - return stdout.getvalue() - - def _find_shares(self, node): - si = node.get_storage_index() - out = self._run_cli(["debug", "find-shares", base32.b2a(si)] + - [c.basedir for c in self.clients]) - files = out.split("\n") - return [f for f in files if f] - - def _delete_some_shares(self, node): - shares = self._find_shares(node) - os.unlink(shares[0]) - os.unlink(shares[1]) - - def _corrupt_some_shares(self, node): - shares = self._find_shares(node) - self._run_cli(["debug", "corrupt-share", shares[0]]) - self._run_cli(["debug", "corrupt-share", shares[1]]) - - def _delete_most_shares(self, node): - shares = self._find_shares(node) - for share in shares[1:]: - os.unlink(share) - - - def check_is_healthy(self, cr, where): - try: - self.failUnless(ICheckResults.providedBy(cr), (cr, type(cr), where)) - self.failUnless(cr.is_healthy(), (cr.get_report(), cr.is_healthy(), cr.get_summary(), where)) - self.failUnless(cr.is_recoverable(), where) - d = cr.get_data() - self.failUnlessEqual(d["count-recoverable-versions"], 1, where) - self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where) - return cr - except Exception, le: - le.args = tuple(le.args + (where,)) - raise - - def check_is_missing_shares(self, cr, where): - self.failUnless(ICheckResults.providedBy(cr), where) - self.failIf(cr.is_healthy(), where) - self.failUnless(cr.is_recoverable(), where) - d = cr.get_data() - self.failUnlessEqual(d["count-recoverable-versions"], 1, where) - self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where) - return cr - - def check_has_corrupt_shares(self, cr, where): - # by "corrupt-shares" we mean the file is still recoverable - self.failUnless(ICheckResults.providedBy(cr), where) - d = cr.get_data() - self.failIf(cr.is_healthy(), (where, cr)) - self.failUnless(cr.is_recoverable(), where) - d = cr.get_data() - self.failUnless(d["count-shares-good"] < 10, where) - self.failUnless(d["count-corrupt-shares"], where) - self.failUnless(d["list-corrupt-shares"], where) - return cr - - def check_is_unrecoverable(self, cr, where): - self.failUnless(ICheckResults.providedBy(cr), where) - d = cr.get_data() - self.failIf(cr.is_healthy(), where) - self.failIf(cr.is_recoverable(), where) - self.failUnless(d["count-shares-good"] < d["count-shares-needed"], (d["count-shares-good"], d["count-shares-needed"], where)) - self.failUnlessEqual(d["count-recoverable-versions"], 0, where) - self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where) - return cr - - def do_check(self, ignored): - d = defer.succeed(None) - - # check the individual items, without verification. This will not - # detect corrupt shares. - def _check(which, checker): - d = self.nodes[which].check(Monitor()) - d.addCallback(checker, which + "--check") - return d - - d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy)) - d.addCallback(lambda ign: _check("mutable-missing-shares", - self.check_is_missing_shares)) - d.addCallback(lambda ign: _check("mutable-corrupt-shares", - self.check_is_healthy)) - d.addCallback(lambda ign: _check("mutable-unrecoverable", - self.check_is_unrecoverable)) - d.addCallback(lambda ign: _check("large-good", self.check_is_healthy)) - d.addCallback(lambda ign: _check("large-missing-shares", - self.check_is_missing_shares)) - d.addCallback(lambda ign: _check("large-corrupt-shares", - self.check_is_healthy)) - d.addCallback(lambda ign: _check("large-unrecoverable", - self.check_is_unrecoverable)) - - # and again with verify=True, which *does* detect corrupt shares. - def _checkv(which, checker): - d = self.nodes[which].check(Monitor(), verify=True) - d.addCallback(checker, which + "--check-and-verify") - return d - - d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy)) - d.addCallback(lambda ign: _checkv("mutable-missing-shares", - self.check_is_missing_shares)) - d.addCallback(lambda ign: _checkv("mutable-corrupt-shares", - self.check_has_corrupt_shares)) - d.addCallback(lambda ign: _checkv("mutable-unrecoverable", - self.check_is_unrecoverable)) - d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy)) - d.addCallback(lambda ign: _checkv("large-missing-shares", self.check_is_missing_shares)) - d.addCallback(lambda ign: _checkv("large-corrupt-shares", self.check_has_corrupt_shares)) - d.addCallback(lambda ign: _checkv("large-unrecoverable", - self.check_is_unrecoverable)) - - return d - - def do_deepcheck(self, ignored): - d = defer.succeed(None) - - # now deep-check the root, with various verify= and repair= options - d.addCallback(lambda ign: - self.root.start_deep_check().when_done()) - def _check1(cr): - self.failUnless(IDeepCheckResults.providedBy(cr)) - c = cr.get_counters() - self.failUnlessEqual(c["count-objects-checked"], 9) - self.failUnlessEqual(c["count-objects-healthy"], 5) - self.failUnlessEqual(c["count-objects-unhealthy"], 4) - self.failUnlessEqual(c["count-objects-unrecoverable"], 2) - d.addCallback(_check1) - - d.addCallback(lambda ign: - self.root.start_deep_check(verify=True).when_done()) - def _check2(cr): - self.failUnless(IDeepCheckResults.providedBy(cr)) - c = cr.get_counters() - self.failUnlessEqual(c["count-objects-checked"], 9) - self.failUnlessEqual(c["count-objects-healthy"], 3) - self.failUnlessEqual(c["count-objects-unhealthy"], 6) - self.failUnlessEqual(c["count-objects-healthy"], 3) # root, mutable good, large good - self.failUnlessEqual(c["count-objects-unrecoverable"], 2) # mutable unrecoverable, large unrecoverable - d.addCallback(_check2) - - return d - - def json_is_healthy(self, data, where): - r = data["results"] - self.failUnless(r["healthy"], where) - self.failUnless(r["recoverable"], where) - self.failUnlessEqual(r["count-recoverable-versions"], 1, where) - self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where) - - def json_is_missing_shares(self, data, where): - r = data["results"] - self.failIf(r["healthy"], where) - self.failUnless(r["recoverable"], where) - self.failUnlessEqual(r["count-recoverable-versions"], 1, where) - self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where) - - def json_has_corrupt_shares(self, data, where): - # by "corrupt-shares" we mean the file is still recoverable - r = data["results"] - self.failIf(r["healthy"], where) - self.failUnless(r["recoverable"], where) - self.failUnless(r["count-shares-good"] < 10, where) - self.failUnless(r["count-corrupt-shares"], where) - self.failUnless(r["list-corrupt-shares"], where) - - def json_is_unrecoverable(self, data, where): - r = data["results"] - self.failIf(r["healthy"], where) - self.failIf(r["recoverable"], where) - self.failUnless(r["count-shares-good"] < r["count-shares-needed"], - where) - self.failUnlessEqual(r["count-recoverable-versions"], 0, where) - self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where) - - def do_test_web_bad(self, ignored): - d = defer.succeed(None) - - # check, no verify - def _check(which, checker): - d = self.web_json(self.nodes[which], t="check") - d.addCallback(checker, which + "--webcheck") - return d - - d.addCallback(lambda ign: _check("mutable-good", - self.json_is_healthy)) - d.addCallback(lambda ign: _check("mutable-missing-shares", - self.json_is_missing_shares)) - d.addCallback(lambda ign: _check("mutable-corrupt-shares", - self.json_is_healthy)) - d.addCallback(lambda ign: _check("mutable-unrecoverable", - self.json_is_unrecoverable)) - d.addCallback(lambda ign: _check("large-good", - self.json_is_healthy)) - d.addCallback(lambda ign: _check("large-missing-shares", - self.json_is_missing_shares)) - d.addCallback(lambda ign: _check("large-corrupt-shares", - self.json_is_healthy)) - d.addCallback(lambda ign: _check("large-unrecoverable", - self.json_is_unrecoverable)) - - # check and verify - def _checkv(which, checker): - d = self.web_json(self.nodes[which], t="check", verify="true") - d.addCallback(checker, which + "--webcheck-and-verify") - return d - - d.addCallback(lambda ign: _checkv("mutable-good", - self.json_is_healthy)) - d.addCallback(lambda ign: _checkv("mutable-missing-shares", - self.json_is_missing_shares)) - d.addCallback(lambda ign: _checkv("mutable-corrupt-shares", - self.json_has_corrupt_shares)) - d.addCallback(lambda ign: _checkv("mutable-unrecoverable", - self.json_is_unrecoverable)) - d.addCallback(lambda ign: _checkv("large-good", - self.json_is_healthy)) - d.addCallback(lambda ign: _checkv("large-missing-shares", self.json_is_missing_shares)) - d.addCallback(lambda ign: _checkv("large-corrupt-shares", self.json_has_corrupt_shares)) - d.addCallback(lambda ign: _checkv("large-unrecoverable", - self.json_is_unrecoverable)) - - return d