From: Brian Warner Date: Wed, 18 Feb 2009 00:15:11 +0000 (-0700) Subject: CLI #590: convert 'tahoe deep-check' to streaming form, improve display, add tests X-Git-Tag: allmydata-tahoe-1.4.0~200 X-Git-Url: https://git.rkrishnan.org/pf/content/simplejson/provisioning?a=commitdiff_plain;h=fde2289e7b1fda8af4a8ee1fe0830d23beaaf77b;p=tahoe-lafs%2Ftahoe-lafs.git CLI #590: convert 'tahoe deep-check' to streaming form, improve display, add tests --- diff --git a/docs/frontends/webapi.txt b/docs/frontends/webapi.txt index 0caa1eaa..49bfa748 100644 --- a/docs/frontends/webapi.txt +++ b/docs/frontends/webapi.txt @@ -1006,6 +1006,24 @@ POST $URL?t=start-deep-check&repair=true (must add &ophandle=XYZ) stats: a dictionary with the same keys as the t=start-deep-stats command (described below) +POST $URL?t=stream-deep-check&repair=true + + This triggers a recursive walk of all files and directories, performing a + t=check&repair=true on each one. For each unique object (duplicates are + skipped), a single line of JSON is emitted to the HTTP response channel. + When the walk is complete, a final line of JSON is emitted which contains + the accumulated file-size/count "deep-stats" data. + + This emits the same data as t=stream-deep-check (without the repair=true), + except that the "check-results" field is replaced with a + "check-and-repair-results" field, which contains the keys returned by + t=check&repair=true&output=json (i.e. repair-attempted, repair-successful, + pre-repair-results, and post-repair-results). The output does not contain + the summary dictionary that is provied by t=start-deep-check&repair=true + (the one with count-objects-checked and list-unhealthy-files), since the + receiving client is expected to calculate those values itself from the + stream of per-object check-and-repair-results. + POST $DIRURL?t=start-manifest (must add &ophandle=XYZ) This operation generates a "manfest" of the given directory tree, mostly diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index c44acffa..ad7bc48c 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -228,8 +228,10 @@ class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): prr.data['servers-responding'] = list(servers_responding) prr.data['count-shares-good'] = len(sm) prr.data['count-good-share-hosts'] = len(sm) - is_healthy = len(sm) >= self.u.total_shares + is_healthy = bool(len(sm) >= self.u.total_shares) + is_recoverable = bool(len(sm) >= self.u.needed_shares) prr.set_healthy(is_healthy) + prr.set_recoverable(is_recoverable) crr.repair_successful = is_healthy prr.set_needs_rebalancing(len(sm) >= self.u.total_shares) diff --git a/src/allmydata/scripts/cli.py b/src/allmydata/scripts/cli.py index da1a1374..c8cf7039 100644 --- a/src/allmydata/scripts/cli.py +++ b/src/allmydata/scripts/cli.py @@ -244,9 +244,9 @@ class StatsOptions(VDriveOptions): class CheckOptions(VDriveOptions): optFlags = [ - ("raw", "r", "Display raw JSON data instead of parsed"), - ("verify", "v", "Verify all hashes, instead of merely querying share presence"), - ("repair", "r", "Automatically repair any problems found"), + ("raw", None, "Display raw JSON data instead of parsed"), + ("verify", None, "Verify all hashes, instead of merely querying share presence"), + ("repair", None, "Automatically repair any problems found"), ] def parseArgs(self, where=''): self.where = where @@ -258,9 +258,10 @@ class CheckOptions(VDriveOptions): class DeepCheckOptions(VDriveOptions): optFlags = [ - ("raw", "r", "Display raw JSON data instead of parsed"), - ("verify", "v", "Verify all hashes, instead of merely querying share presence"), - ("repair", "r", "Automatically repair any problems found"), + ("raw", None, "Display raw JSON data instead of parsed"), + ("verify", None, "Verify all hashes, instead of merely querying share presence"), + ("repair", None, "Automatically repair any problems found"), + ("verbose", "v", "Be noisy about what is happening."), ] def parseArgs(self, where=''): self.where = where diff --git a/src/allmydata/scripts/tahoe_check.py b/src/allmydata/scripts/tahoe_check.py index a52d5b2d..0574da7d 100644 --- a/src/allmydata/scripts/tahoe_check.py +++ b/src/allmydata/scripts/tahoe_check.py @@ -1,10 +1,9 @@ -from pprint import pprint import urllib import simplejson +from twisted.protocols.basic import LineOnlyReceiver from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, escape_path from allmydata.scripts.common_http import do_http -from allmydata.scripts.slow_operation import SlowOperationRunner class Checker: pass @@ -42,44 +41,230 @@ def check(options): if options["repair"]: # show repair status - pprint(data, stream=stdout) + if data["pre-repair-results"]["results"]["healthy"]: + summary = "healthy" + else: + summary = "not healthy" + stdout.write("Summary: %s\n" % summary) + cr = data["pre-repair-results"]["results"] + stdout.write(" storage index: %s\n" % data["storage-index"]) + stdout.write(" good-shares: %d (encoding is %d-of-%d)\n" + % (cr["count-shares-good"], + cr["count-shares-needed"], + cr["count-shares-expected"])) + stdout.write(" wrong-shares: %d\n" % cr["count-wrong-shares"]) + corrupt = cr["list-corrupt-shares"] + if corrupt: + stdout.write(" corrupt shares:\n") + for (serverid, storage_index, sharenum) in corrupt: + stdout.write(" server %s, SI %s, shnum %d\n" % + (serverid, storage_index, sharenum)) + if data["repair-attempted"]: + if data["repair-successful"]: + stdout.write(" repair successful\n") + else: + stdout.write(" repair failed\n") else: - # make this prettier - pprint(data, stream=stdout) + stdout.write("Summary: %s\n" % data["summary"]) + cr = data["results"] + stdout.write(" storage index: %s\n" % data["storage-index"]) + stdout.write(" good-shares: %d (encoding is %d-of-%d)\n" + % (cr["count-shares-good"], + cr["count-shares-needed"], + cr["count-shares-expected"])) + stdout.write(" wrong-shares: %d\n" % cr["count-wrong-shares"]) + corrupt = cr["list-corrupt-shares"] + if corrupt: + stdout.write(" corrupt shares:\n") + for (serverid, storage_index, sharenum) in corrupt: + stdout.write(" server %s, SI %s, shnum %d\n" % + (serverid, storage_index, sharenum)) return 0 -class DeepChecker(SlowOperationRunner): +class FakeTransport: + disconnecting = False + +class DeepCheckOutput(LineOnlyReceiver): + delimiter = "\n" + def __init__(self, options): + self.transport = FakeTransport() + + self.verbose = bool(options["verbose"]) + self.stdout = options.stdout + self.num_objects = 0 + self.files_healthy = 0 + self.files_unhealthy = 0 + + def lineReceived(self, line): + d = simplejson.loads(line) + stdout = self.stdout + if d["type"] not in ("file", "directory"): + return + self.num_objects += 1 + # non-verbose means print a progress marker every 100 files + if self.num_objects % 100 == 0: + print >>stdout, "%d objects checked.." % self.num_objects + cr = d["check-results"] + if cr["results"]["healthy"]: + self.files_healthy += 1 + else: + self.files_unhealthy += 1 + if self.verbose: + # verbose means also print one line per file + path = d["path"] + if not path: + path = [""] + summary = cr.get("summary", "Healthy (LIT)") + try: + print >>stdout, "%s: %s" % ("/".join(path), summary) + except UnicodeEncodeError: + print >>stdout, "%s: %s" % ("/".join([p.encode("utf-8") + for p in path]), + summary) + # always print out corrupt shares + for shareloc in cr["results"].get("list-corrupt-shares", []): + (serverid, storage_index, sharenum) = shareloc + print >>stdout, " corrupt: server %s, SI %s, shnum %d" % \ + (serverid, storage_index, sharenum) + + def done(self): + stdout = self.stdout + print >>stdout, "done: %d objects checked, %d healthy, %d unhealthy" \ + % (self.num_objects, self.files_healthy, self.files_unhealthy) + +class DeepCheckAndRepairOutput(LineOnlyReceiver): + delimiter = "\n" + def __init__(self, options): + self.transport = FakeTransport() + + self.verbose = bool(options["verbose"]) + self.stdout = options.stdout + self.num_objects = 0 + self.pre_repair_files_healthy = 0 + self.pre_repair_files_unhealthy = 0 + self.repairs_attempted = 0 + self.repairs_successful = 0 + self.post_repair_files_healthy = 0 + self.post_repair_files_unhealthy = 0 - def make_url(self, base, ophandle): - url = base + "?t=start-deep-check&ophandle=" + ophandle - if self.options["verify"]: + def lineReceived(self, line): + d = simplejson.loads(line) + stdout = self.stdout + if d["type"] not in ("file", "directory"): + return + self.num_objects += 1 + # non-verbose means print a progress marker every 100 files + if self.num_objects % 100 == 0: + print >>stdout, "%d objects checked.." % self.num_objects + crr = d["check-and-repair-results"] + if d["storage-index"]: + if crr["pre-repair-results"]["results"]["healthy"]: + was_healthy = True + self.pre_repair_files_healthy += 1 + else: + was_healthy = False + self.pre_repair_files_unhealthy += 1 + if crr["post-repair-results"]["results"]["healthy"]: + self.post_repair_files_healthy += 1 + else: + self.post_repair_files_unhealthy += 1 + else: + # LIT file + was_healthy = True + self.pre_repair_files_healthy += 1 + self.post_repair_files_healthy += 1 + if crr["repair-attempted"]: + self.repairs_attempted += 1 + if crr["repair-successful"]: + self.repairs_successful += 1 + if self.verbose: + # verbose means also print one line per file + path = d["path"] + if not path: + path = [""] + # we don't seem to have a summary available, so build one + if was_healthy: + summary = "healthy" + else: + summary = "not healthy" + try: + print >>stdout, "%s: %s" % ("/".join(path), summary) + except UnicodeEncodeError: + print >>stdout, "%s: %s" % ("/".join([p.encode("utf-8") + for p in path]), + summary) + # always print out corrupt shares + prr = crr.get("pre-repair-results", {}) + for shareloc in prr.get("results", {}).get("list-corrupt-shares", []): + (serverid, storage_index, sharenum) = shareloc + print >>stdout, " corrupt: server %s, SI %s, shnum %d" % \ + (serverid, storage_index, sharenum) + + # always print out repairs + if crr["repair-attempted"]: + if crr["repair-successful"]: + print >>stdout, " repair successful" + else: + print >>stdout, " repair failed" + + def done(self): + stdout = self.stdout + print >>stdout, "done: %d objects checked" % self.num_objects + print >>stdout, " pre-repair: %d healthy, %d unhealthy" \ + % (self.pre_repair_files_healthy, + self.pre_repair_files_unhealthy) + print >>stdout, " %d repairs attempted, %d successful, %d failed" \ + % (self.repairs_attempted, + self.repairs_successful, + (self.repairs_attempted - self.repairs_successful)) + print >>stdout, " post-repair: %d healthy, %d unhealthy" \ + % (self.post_repair_files_healthy, + self.post_repair_files_unhealthy) + +class DeepCheckStreamer(LineOnlyReceiver): + + def run(self, options): + stdout = options.stdout + stderr = options.stderr + self.options = options + nodeurl = options['node-url'] + if not nodeurl.endswith("/"): + nodeurl += "/" + self.nodeurl = nodeurl + where = options.where + rootcap, path = get_alias(options.aliases, where, DEFAULT_ALIAS) + if path == '/': + path = '' + url = nodeurl + "uri/%s" % urllib.quote(rootcap) + if path: + url += "/" + escape_path(path) + # todo: should it end with a slash? + url += "?t=stream-deep-check" + if options["verify"]: url += "&verify=true" - if self.options["repair"]: + if options["repair"]: url += "&repair=true" - return url - - def write_results(self, data): - out = self.options.stdout - err = self.options.stderr - if self.options["repair"]: - # todo: make this prettier - pprint(data, stream=out) + output = DeepCheckAndRepairOutput(options) else: - print >>out, "Objects Checked: %d" % data["count-objects-checked"] - print >>out, "Objects Healthy: %d" % data["count-objects-healthy"] - print >>out, "Objects Unhealthy: %d" % data["count-objects-unhealthy"] - print >>out - if data["list-unhealthy-files"]: - print "Unhealthy Files:" - for (pathname, cr) in data["list-unhealthy-files"]: - if pathname: - path_s = "/".join(pathname) - else: - path_s = "" - print >>out, path_s, ":", cr["summary"] + output = DeepCheckOutput(options) + resp = do_http("POST", url) + if resp.status not in (200, 302): + print >>stderr, "ERROR", resp.status, resp.reason, resp.read() + return 1 + # use Twisted to split this into lines + while True: + chunk = resp.read(100) + if not chunk: + break + if self.options["raw"]: + stdout.write(chunk) + else: + output.dataReceived(chunk) + if not self.options["raw"]: + output.done() + return 0 def deepcheck(options): - return DeepChecker().run(options) - + return DeepCheckStreamer().run(options) diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py index 9089ca5e..15f46ee9 100644 --- a/src/allmydata/test/test_cli.py +++ b/src/allmydata/test/test_cli.py @@ -5,9 +5,11 @@ from twisted.trial import unittest from cStringIO import StringIO import urllib import re +import simplejson -from allmydata.util import fileutil, hashutil +from allmydata.util import fileutil, hashutil, base32 from allmydata import uri +from allmydata.immutable import upload # Test that the scripts can be imported -- although the actual tests of their functionality are # done by invoking them in a subprocess. @@ -928,3 +930,222 @@ class Backup(GridTestMixin, CLITestMixin, StallMixin, unittest.TestCase): # dirnodes being created (RSA key generation). The backup between check4 # and check4a takes 6s, as does the backup before check4b. test_backup.timeout = 300 + +class Check(GridTestMixin, CLITestMixin, unittest.TestCase): + + def test_check(self): + self.basedir = "cli/Check/check" + self.set_up_grid() + c0 = self.g.clients[0] + DATA = "data" * 100 + d = c0.create_mutable_file(DATA) + def _stash_uri(n): + self.uri = n.get_uri() + d.addCallback(_stash_uri) + + d.addCallback(lambda ign: self.do_cli("check", self.uri)) + def _check1((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + self.failUnless("Summary: Healthy" in lines, out) + self.failUnless(" good-shares: 10 (encoding is 3-of-10)" in lines, out) + d.addCallback(_check1) + + d.addCallback(lambda ign: self.do_cli("check", "--raw", self.uri)) + def _check2((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + data = simplejson.loads(out) + self.failUnlessEqual(data["summary"], "Healthy") + d.addCallback(_check2) + + def _clobber_shares(ignored): + # delete one, corrupt a second + shares = self.find_shares(self.uri) + self.failUnlessEqual(len(shares), 10) + os.unlink(shares[0][2]) + cso = debug.CorruptShareOptions() + cso.stdout = StringIO() + cso.parseOptions([shares[1][2]]) + storage_index = uri.from_string(self.uri).get_storage_index() + self._corrupt_share_line = " server %s, SI %s, shnum %d" % \ + (base32.b2a(shares[1][1]), + base32.b2a(storage_index), + shares[1][0]) + debug.corrupt_share(cso) + d.addCallback(_clobber_shares) + + d.addCallback(lambda ign: self.do_cli("check", "--verify", self.uri)) + def _check3((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + summary = [l for l in lines if l.startswith("Summary")][0] + self.failUnless("Summary: Unhealthy: 8 shares (enc 3-of-10)" + in summary, summary) + self.failUnless(" good-shares: 8 (encoding is 3-of-10)" in lines, out) + self.failUnless(" corrupt shares:" in lines, out) + self.failUnless(self._corrupt_share_line in lines, out) + d.addCallback(_check3) + + d.addCallback(lambda ign: + self.do_cli("check", "--verify", "--repair", self.uri)) + def _check4((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + self.failUnless("Summary: not healthy" in lines, out) + self.failUnless(" good-shares: 8 (encoding is 3-of-10)" in lines, out) + self.failUnless(" corrupt shares:" in lines, out) + self.failUnless(self._corrupt_share_line in lines, out) + self.failUnless(" repair successful" in lines, out) + d.addCallback(_check4) + + d.addCallback(lambda ign: + self.do_cli("check", "--verify", "--repair", self.uri)) + def _check5((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + self.failUnless("Summary: healthy" in lines, out) + self.failUnless(" good-shares: 10 (encoding is 3-of-10)" in lines, out) + self.failIf(" corrupt shares:" in lines, out) + d.addCallback(_check5) + + return d + + def test_deep_check(self): + self.basedir = "cli/Check/deep_check" + self.set_up_grid() + c0 = self.g.clients[0] + self.uris = {} + self.fileurls = {} + DATA = "data" * 100 + d = c0.create_empty_dirnode() + def _stash_root_and_create_file(n): + self.rootnode = n + self.rooturi = n.get_uri() + return n.add_file(u"good", upload.Data(DATA, convergence="")) + d.addCallback(_stash_root_and_create_file) + def _stash_uri(fn, which): + self.uris[which] = fn.get_uri() + d.addCallback(_stash_uri, "good") + d.addCallback(lambda ign: + self.rootnode.add_file(u"small", + upload.Data("literal", + convergence=""))) + d.addCallback(_stash_uri, "small") + d.addCallback(lambda ign: c0.create_mutable_file(DATA+"1")) + d.addCallback(lambda fn: self.rootnode.set_node(u"mutable", fn)) + d.addCallback(_stash_uri, "mutable") + + d.addCallback(lambda ign: self.do_cli("deep-check", self.rooturi)) + def _check1((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + self.failUnless("done: 4 objects checked, 4 healthy, 0 unhealthy" + in lines, out) + d.addCallback(_check1) + + d.addCallback(lambda ign: self.do_cli("deep-check", "--verbose", + self.rooturi)) + def _check2((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + self.failUnless(": Healthy" in lines, out) + self.failUnless("small: Healthy (LIT)" in lines, out) + self.failUnless("good: Healthy" in lines, out) + self.failUnless("mutable: Healthy" in lines, out) + self.failUnless("done: 4 objects checked, 4 healthy, 0 unhealthy" + in lines, out) + d.addCallback(_check2) + + def _clobber_shares(ignored): + shares = self.find_shares(self.uris["good"]) + self.failUnlessEqual(len(shares), 10) + os.unlink(shares[0][2]) + + shares = self.find_shares(self.uris["mutable"]) + cso = debug.CorruptShareOptions() + cso.stdout = StringIO() + cso.parseOptions([shares[1][2]]) + storage_index = uri.from_string(self.uris["mutable"]).get_storage_index() + self._corrupt_share_line = " corrupt: server %s, SI %s, shnum %d" % \ + (base32.b2a(shares[1][1]), + base32.b2a(storage_index), + shares[1][0]) + debug.corrupt_share(cso) + d.addCallback(_clobber_shares) + + d.addCallback(lambda ign: + self.do_cli("deep-check", "--verbose", self.rooturi)) + def _check3((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + self.failUnless(": Healthy" in lines, out) + self.failUnless("small: Healthy (LIT)" in lines, out) + self.failUnless("mutable: Healthy" in lines, out) # needs verifier + self.failUnless("good: Not Healthy: 9 shares (enc 3-of-10)" + in lines, out) + self.failIf(self._corrupt_share_line in lines, out) + self.failUnless("done: 4 objects checked, 3 healthy, 1 unhealthy" + in lines, out) + d.addCallback(_check3) + + d.addCallback(lambda ign: + self.do_cli("deep-check", "--verbose", "--verify", + self.rooturi)) + def _check4((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + self.failUnless(": Healthy" in lines, out) + self.failUnless("small: Healthy (LIT)" in lines, out) + mutable = [l for l in lines if l.startswith("mutable")][0] + self.failUnless(mutable.startswith("mutable: Unhealthy: 9 shares (enc 3-of-10)"), + mutable) + self.failUnless(self._corrupt_share_line in lines, out) + self.failUnless("good: Not Healthy: 9 shares (enc 3-of-10)" + in lines, out) + self.failUnless("done: 4 objects checked, 2 healthy, 2 unhealthy" + in lines, out) + d.addCallback(_check4) + + d.addCallback(lambda ign: + self.do_cli("deep-check", "--raw", + self.rooturi)) + def _check5((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + units = [simplejson.loads(line) for line in lines] + # root, small, good, mutable, stats + self.failUnlessEqual(len(units), 4+1) + d.addCallback(_check5) + + d.addCallback(lambda ign: + self.do_cli("deep-check", + "--verbose", "--verify", "--repair", + self.rooturi)) + def _check6((rc, out, err)): + self.failUnlessEqual(err, "") + self.failUnlessEqual(rc, 0) + lines = out.splitlines() + self.failUnless(": healthy" in lines, out) + self.failUnless("small: healthy" in lines, out) + self.failUnless("mutable: not healthy" in lines, out) + self.failUnless(self._corrupt_share_line in lines, out) + self.failUnless("good: not healthy" in lines, out) + self.failUnless("done: 4 objects checked" in lines, out) + self.failUnless(" pre-repair: 2 healthy, 2 unhealthy" in lines, out) + self.failUnless(" 2 repairs attempted, 2 successful, 0 failed" + in lines, out) + self.failUnless(" post-repair: 4 healthy, 0 unhealthy" in lines,out) + d.addCallback(_check6) + + return d +