From: Brian Warner Date: Tue, 17 Feb 2009 06:35:53 +0000 (-0700) Subject: webapi #590: add streaming deep-check. Still need a CLI tool to use it. X-Git-Tag: allmydata-tahoe-1.4.0~203 X-Git-Url: https://git.rkrishnan.org/simplejson/decoder.py.html?a=commitdiff_plain;h=476a5c8fac9909d53bd3b5b93eb2f2903f347b62;p=tahoe-lafs%2Ftahoe-lafs.git webapi #590: add streaming deep-check. Still need a CLI tool to use it. --- diff --git a/docs/frontends/webapi.txt b/docs/frontends/webapi.txt index 46eaf77f..0caa1eaa 100644 --- a/docs/frontends/webapi.txt +++ b/docs/frontends/webapi.txt @@ -885,6 +885,45 @@ POST $URL?t=start-deep-check (must add &ophandle=XYZ) stats: a dictionary with the same keys as the t=start-deep-stats command (described below) +POST $URL?t=stream-deep-check + + This initiates a recursive walk of all files and directories reachable from + the target, performing a check on each one just like t=check. For each + unique object (duplicates are skipped), a single line of JSON is emitted to + the HTTP response channel. When the walk is complete, a final line of JSON + is emitted which contains the accumulated file-size/count "deep-stats" data. + + This command takes the same arguments as t=start-deep-check. + + A CLI tool can split the response stream on newlines into "response units", + and parse each response unit as JSON. Each such parsed unit will be a + dictionary, and will contain at least the "type" key: a string, one of + "file", "directory", or "stats". + + For all units that have a type of "file" or "directory", the dictionary will + contain the following keys: + + "path": a list of strings, with the path that is traversed to reach the + object + "cap": a writecap for the file or directory, if available, else a readcap + "verifycap": a verifycap for the file or directory + "repaircap": the weakest cap which can still be used to repair the object + "storage-index": a base32 storage index for the object + "check-results": a copy of the dictionary which would be returned by + t=check&output=json, with three top-level keys: + "storage-index", "summary", and "results", and a variety + of counts and sharemaps in the "results" value. + + Note that non-distributed files (i.e. LIT files) will have values of None + for verifycap, repaircap, and storage-index, since these files can neither + be verified nor repaired, and are not stored on the storage servers. + Likewise the check-results dictionary will be limited: an empty string for + storage-index, and a results dictionary with only the "healthy" key. + + The last unit in the stream will have a type of "stats", and will contain + the keys described in the "start-deep-stats" operation, below. + + POST $URL?t=check&repair=true This performs a health check of the given file or directory, and if the diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 68319c09..a749e0c5 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -2793,3 +2793,150 @@ class Grid(GridTestMixin, WebErrorMixin, unittest.TestCase): d.addErrback(self.explain_web_error) return d + + def test_deep_check(self): + self.basedir = "web/Grid/deep_check" + self.set_up_grid() + c0 = self.g.clients[0] + self.uris = {} + self.fileurls = {} + DATA = "data" * 100 + d = c0.create_empty_dirnode() + def _stash_root_and_create_file(n): + self.rootnode = n + self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/" + return n.add_file(u"good", upload.Data(DATA, convergence="")) + d.addCallback(_stash_root_and_create_file) + def _stash_uri(fn, which): + self.uris[which] = fn.get_uri() + d.addCallback(_stash_uri, "good") + d.addCallback(lambda ign: + self.rootnode.add_file(u"small", + upload.Data("literal", + convergence=""))) + d.addCallback(_stash_uri, "small") + + d.addCallback(self.CHECK, "root", "t=stream-deep-check") + def _done(res): + units = [simplejson.loads(line) + for line in res.splitlines() + if line] + self.failUnlessEqual(len(units), 3+1) + # should be parent-first + u0 = units[0] + self.failUnlessEqual(u0["path"], []) + self.failUnlessEqual(u0["type"], "directory") + self.failUnlessEqual(u0["cap"], self.rootnode.get_uri()) + u0cr = u0["check-results"] + self.failUnlessEqual(u0cr["results"]["count-shares-good"], 10) + + ugood = [u for u in units + if u["type"] == "file" and u["path"] == [u"good"]][0] + self.failUnlessEqual(ugood["cap"], self.uris["good"]) + ugoodcr = ugood["check-results"] + self.failUnlessEqual(ugoodcr["results"]["count-shares-good"], 10) + + stats = units[-1] + self.failUnlessEqual(stats["type"], "stats") + s = stats["stats"] + self.failUnlessEqual(s["count-immutable-files"], 1) + self.failUnlessEqual(s["count-literal-files"], 1) + self.failUnlessEqual(s["count-directories"], 1) + d.addCallback(_done) + + d.addErrback(self.explain_web_error) + return d + + def test_deep_check_and_repair(self): + self.basedir = "web/Grid/deep_check_and_repair" + self.set_up_grid() + c0 = self.g.clients[0] + self.uris = {} + self.fileurls = {} + DATA = "data" * 100 + d = c0.create_empty_dirnode() + def _stash_root_and_create_file(n): + self.rootnode = n + self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/" + return n.add_file(u"good", upload.Data(DATA, convergence="")) + d.addCallback(_stash_root_and_create_file) + def _stash_uri(fn, which): + self.uris[which] = fn.get_uri() + d.addCallback(_stash_uri, "good") + d.addCallback(lambda ign: + self.rootnode.add_file(u"small", + upload.Data("literal", + convergence=""))) + d.addCallback(_stash_uri, "small") + d.addCallback(lambda ign: + self.rootnode.add_file(u"sick", + upload.Data(DATA+"1", + convergence=""))) + d.addCallback(_stash_uri, "sick") + #d.addCallback(lambda ign: + # self.rootnode.add_file(u"dead", + # upload.Data(DATA+"2", + # convergence=""))) + #d.addCallback(_stash_uri, "dead") + + #d.addCallback(lambda ign: c0.create_mutable_file("mutable")) + #d.addCallback(lambda fn: self.rootnode.set_node(u"corrupt", fn)) + #d.addCallback(_stash_uri, "corrupt") + + def _clobber_shares(ignored): + good_shares = self.find_shares(self.uris["good"]) + self.failUnlessEqual(len(good_shares), 10) + sick_shares = self.find_shares(self.uris["sick"]) + os.unlink(sick_shares[0][2]) + #dead_shares = self.find_shares(self.uris["dead"]) + #for i in range(1, 10): + # os.unlink(dead_shares[i][2]) + + #c_shares = self.find_shares(self.uris["corrupt"]) + #cso = CorruptShareOptions() + #cso.stdout = StringIO() + #cso.parseOptions([c_shares[0][2]]) + #corrupt_share(cso) + d.addCallback(_clobber_shares) + + d.addCallback(self.CHECK, "root", "t=stream-deep-check&repair=true") + def _done(res): + units = [simplejson.loads(line) + for line in res.splitlines() + if line] + self.failUnlessEqual(len(units), 4+1) + # should be parent-first + u0 = units[0] + self.failUnlessEqual(u0["path"], []) + self.failUnlessEqual(u0["type"], "directory") + self.failUnlessEqual(u0["cap"], self.rootnode.get_uri()) + u0crr = u0["check-and-repair-results"] + self.failUnlessEqual(u0crr["repair-attempted"], False) + self.failUnlessEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10) + + ugood = [u for u in units + if u["type"] == "file" and u["path"] == [u"good"]][0] + self.failUnlessEqual(ugood["cap"], self.uris["good"]) + ugoodcrr = ugood["check-and-repair-results"] + self.failUnlessEqual(u0crr["repair-attempted"], False) + self.failUnlessEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10) + + usick = [u for u in units + if u["type"] == "file" and u["path"] == [u"sick"]][0] + self.failUnlessEqual(usick["cap"], self.uris["sick"]) + usickcrr = usick["check-and-repair-results"] + self.failUnlessEqual(usickcrr["repair-attempted"], True) + self.failUnlessEqual(usickcrr["repair-successful"], True) + self.failUnlessEqual(usickcrr["pre-repair-results"]["results"]["count-shares-good"], 9) + self.failUnlessEqual(usickcrr["post-repair-results"]["results"]["count-shares-good"], 10) + + stats = units[-1] + self.failUnlessEqual(stats["type"], "stats") + s = stats["stats"] + self.failUnlessEqual(s["count-immutable-files"], 2) + self.failUnlessEqual(s["count-literal-files"], 1) + self.failUnlessEqual(s["count-directories"], 1) + d.addCallback(_done) + + d.addErrback(self.explain_web_error) + return d diff --git a/src/allmydata/web/check_results.py b/src/allmydata/web/check_results.py index d8af3648..bf7d75db 100644 --- a/src/allmydata/web/check_results.py +++ b/src/allmydata/web/check_results.py @@ -9,6 +9,64 @@ from allmydata.web.operations import ReloadMixin from allmydata.interfaces import ICheckAndRepairResults, ICheckResults from allmydata.util import base32, idlib +def json_check_counts(d): + r = {} + r["count-shares-good"] = d["count-shares-good"] + r["count-shares-needed"] = d["count-shares-needed"] + r["count-shares-expected"] = d["count-shares-expected"] + r["count-good-share-hosts"] = d["count-good-share-hosts"] + r["count-corrupt-shares"] = d["count-corrupt-shares"] + r["list-corrupt-shares"] = [ (idlib.nodeid_b2a(serverid), + base32.b2a(si), shnum) + for (serverid, si, shnum) + in d["list-corrupt-shares"] ] + r["servers-responding"] = [idlib.nodeid_b2a(serverid) + for serverid in d["servers-responding"]] + sharemap = {} + for (shareid, serverids) in d["sharemap"].items(): + sharemap[shareid] = [idlib.nodeid_b2a(serverid) + for serverid in serverids] + r["sharemap"] = sharemap + + r["count-wrong-shares"] = d["count-wrong-shares"] + r["count-recoverable-versions"] = d["count-recoverable-versions"] + r["count-unrecoverable-versions"] = d["count-unrecoverable-versions"] + + return r + +def json_check_results(r): + if r is None: + # LIT file + data = {"storage-index": "", + "results": {"healthy": True}, + } + return data + data = {} + data["storage-index"] = r.get_storage_index_string() + data["summary"] = r.get_summary() + data["results"] = json_check_counts(r.get_data()) + data["results"]["needs-rebalancing"] = r.needs_rebalancing() + data["results"]["healthy"] = r.is_healthy() + data["results"]["recoverable"] = r.is_recoverable() + return data + +def json_check_and_repair_results(r): + if r is None: + # LIT file + data = {"storage-index": "", + "repair-attempted": False, + } + return data + data = {} + data["storage-index"] = r.get_storage_index_string() + data["repair-attempted"] = r.get_repair_attempted() + data["repair-successful"] = r.get_repair_successful() + pre = r.get_pre_repair_results() + data["pre-repair-results"] = json_check_results(pre) + post = r.get_post_repair_results() + data["post-repair-results"] = json_check_results(post) + return data + class ResultsBase: def _join_pathstring(self, path): if path: @@ -94,52 +152,6 @@ class ResultsBase: return T.ul[r] - def _json_check_and_repair_results(self, r): - data = {} - data["storage-index"] = r.get_storage_index_string() - data["repair-attempted"] = r.get_repair_attempted() - data["repair-successful"] = r.get_repair_successful() - pre = r.get_pre_repair_results() - data["pre-repair-results"] = self._json_check_results(pre) - post = r.get_post_repair_results() - data["post-repair-results"] = self._json_check_results(post) - return data - - def _json_check_results(self, r): - data = {} - data["storage-index"] = r.get_storage_index_string() - data["summary"] = r.get_summary() - data["results"] = self._json_check_counts(r.get_data()) - data["results"]["needs-rebalancing"] = r.needs_rebalancing() - data["results"]["healthy"] = r.is_healthy() - data["results"]["recoverable"] = r.is_recoverable() - return data - - def _json_check_counts(self, d): - r = {} - r["count-shares-good"] = d["count-shares-good"] - r["count-shares-needed"] = d["count-shares-needed"] - r["count-shares-expected"] = d["count-shares-expected"] - r["count-good-share-hosts"] = d["count-good-share-hosts"] - r["count-corrupt-shares"] = d["count-corrupt-shares"] - r["list-corrupt-shares"] = [ (idlib.nodeid_b2a(serverid), - base32.b2a(si), shnum) - for (serverid, si, shnum) - in d["list-corrupt-shares"] ] - r["servers-responding"] = [idlib.nodeid_b2a(serverid) - for serverid in d["servers-responding"]] - sharemap = {} - for (shareid, serverids) in d["sharemap"].items(): - sharemap[shareid] = [idlib.nodeid_b2a(serverid) - for serverid in serverids] - r["sharemap"] = sharemap - - r["count-wrong-shares"] = d["count-wrong-shares"] - r["count-recoverable-versions"] = d["count-recoverable-versions"] - r["count-unrecoverable-versions"] = d["count-unrecoverable-versions"] - - return r - def _html(self, s): if isinstance(s, (str, unicode)): return html.escape(s) @@ -210,7 +222,7 @@ class CheckResults(CheckerBase, rend.Page, ResultsBase): def json(self, ctx): inevow.IRequest(ctx).setHeader("content-type", "text/plain") - data = self._json_check_results(self.r) + data = json_check_results(self.r) return simplejson.dumps(data, indent=1) + "\n" def render_summary(self, ctx, data): @@ -249,7 +261,7 @@ class CheckAndRepairResults(CheckerBase, rend.Page, ResultsBase): def json(self, ctx): inevow.IRequest(ctx).setHeader("content-type", "text/plain") - data = self._json_check_and_repair_results(self.r) + data = json_check_and_repair_results(self.r) return simplejson.dumps(data, indent=1) + "\n" def render_summary(self, ctx, data): @@ -324,7 +336,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin): shnum) for (serverid, storage_index, shnum) in res.get_corrupt_shares() ] - data["list-unhealthy-files"] = [ (path_t, self._json_check_results(r)) + data["list-unhealthy-files"] = [ (path_t, json_check_results(r)) for (path_t, r) in res.get_all_results().items() if not r.is_healthy() ] @@ -496,7 +508,7 @@ class DeepCheckAndRepairResults(rend.Page, ResultsBase, ReloadMixin): data["list-remaining-corrupt-shares"] = remaining_corrupt unhealthy = [ (path_t, - self._json_check_results(crr.get_pre_repair_results())) + json_check_results(crr.get_pre_repair_results())) for (path_t, crr) in res.get_all_results().items() if not crr.get_pre_repair_results().is_healthy() ] diff --git a/src/allmydata/web/directory.py b/src/allmydata/web/directory.py index 2f00f088..367ed9e4 100644 --- a/src/allmydata/web/directory.py +++ b/src/allmydata/web/directory.py @@ -30,6 +30,8 @@ from allmydata.web.check_results import CheckResults, \ CheckAndRepairResults, DeepCheckResults, DeepCheckAndRepairResults from allmydata.web.info import MoreInfo from allmydata.web.operations import ReloadMixin +from allmydata.web.check_results import json_check_results, \ + json_check_and_repair_results class BlockingFileError(Exception): # TODO: catch and transform @@ -189,6 +191,8 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): d = self._POST_check(req) elif t == "start-deep-check": d = self._POST_start_deep_check(ctx) + elif t == "stream-deep-check": + d = self._POST_stream_deep_check(ctx) elif t == "start-manifest": d = self._POST_start_manifest(ctx) elif t == "start-deep-size": @@ -378,6 +382,25 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): renderer = DeepCheckResults(monitor) return self._start_operation(monitor, renderer, ctx) + def _POST_stream_deep_check(self, ctx): + verify = boolean_of_arg(get_arg(ctx, "verify", "false")) + repair = boolean_of_arg(get_arg(ctx, "repair", "false")) + walker = DeepCheckStreamer(ctx, self.node, verify, repair) + monitor = self.node.deep_traverse(walker) + walker.setMonitor(monitor) + # register to hear stopProducing. The walker ignores pauseProducing. + IRequest(ctx).registerProducer(walker, True) + d = monitor.when_done() + def _done(res): + IRequest(ctx).unregisterProducer() + return res + d.addBoth(_done) + def _cancelled(f): + f.trap(OperationCancelledError) + return "Operation Cancelled" + d.addErrback(_cancelled) + return d + def _POST_start_manifest(self, ctx): if not get_arg(ctx, "ophandle"): raise NeedOperationHandleError("slow operation requires ophandle=") @@ -903,3 +926,78 @@ class ManifestStreamer(dirnode.DeepStats): assert "\n" not in j self.req.write(j+"\n") return "" + +class DeepCheckStreamer(dirnode.DeepStats): + implements(IPushProducer) + + def __init__(self, ctx, origin, verify, repair): + dirnode.DeepStats.__init__(self, origin) + self.req = IRequest(ctx) + self.verify = verify + self.repair = repair + + def setMonitor(self, monitor): + self.monitor = monitor + def pauseProducing(self): + pass + def resumeProducing(self): + pass + def stopProducing(self): + self.monitor.cancel() + + def add_node(self, node, path): + dirnode.DeepStats.add_node(self, node, path) + data = {"path": path, + "cap": node.get_uri()} + + if IDirectoryNode.providedBy(node): + data["type"] = "directory" + else: + data["type"] = "file" + + v = node.get_verify_cap() + if v: + v = v.to_string() + data["verifycap"] = v + + r = node.get_repair_cap() + if r: + r = r.to_string() + data["repaircap"] = r + + si = node.get_storage_index() + if si: + si = base32.b2a(si) + data["storage-index"] = si + + if self.repair: + d = node.check_and_repair(self.monitor, self.verify) + d.addCallback(self.add_check_and_repair, data) + else: + d = node.check(self.monitor, self.verify) + d.addCallback(self.add_check, data) + d.addCallback(self.write_line) + return d + + def add_check_and_repair(self, crr, data): + data["check-and-repair-results"] = json_check_and_repair_results(crr) + return data + + def add_check(self, cr, data): + data["check-results"] = json_check_results(cr) + return data + + def write_line(self, data): + j = simplejson.dumps(data, ensure_ascii=True) + assert "\n" not in j + self.req.write(j+"\n") + + def finish(self): + stats = dirnode.DeepStats.get_results(self) + d = {"type": "stats", + "stats": stats, + } + j = simplejson.dumps(d, ensure_ascii=True) + assert "\n" not in j + self.req.write(j+"\n") + return ""