stats: a dictionary with the same keys as the t=start-deep-stats command
(described below)
+POST $URL?t=stream-deep-check
+
+ This initiates a recursive walk of all files and directories reachable from
+ the target, performing a check on each one just like t=check. For each
+ unique object (duplicates are skipped), a single line of JSON is emitted to
+ the HTTP response channel. When the walk is complete, a final line of JSON
+ is emitted which contains the accumulated file-size/count "deep-stats" data.
+
+ This command takes the same arguments as t=start-deep-check.
+
+ A CLI tool can split the response stream on newlines into "response units",
+ and parse each response unit as JSON. Each such parsed unit will be a
+ dictionary, and will contain at least the "type" key: a string, one of
+ "file", "directory", or "stats".
+
+ For all units that have a type of "file" or "directory", the dictionary will
+ contain the following keys:
+
+ "path": a list of strings, with the path that is traversed to reach the
+ object
+ "cap": a writecap for the file or directory, if available, else a readcap
+ "verifycap": a verifycap for the file or directory
+ "repaircap": the weakest cap which can still be used to repair the object
+ "storage-index": a base32 storage index for the object
+ "check-results": a copy of the dictionary which would be returned by
+ t=check&output=json, with three top-level keys:
+ "storage-index", "summary", and "results", and a variety
+ of counts and sharemaps in the "results" value.
+
+ Note that non-distributed files (i.e. LIT files) will have values of None
+ for verifycap, repaircap, and storage-index, since these files can neither
+ be verified nor repaired, and are not stored on the storage servers.
+ Likewise the check-results dictionary will be limited: an empty string for
+ storage-index, and a results dictionary with only the "healthy" key.
+
+ The last unit in the stream will have a type of "stats", and will contain
+ the keys described in the "start-deep-stats" operation, below.
+
+
POST $URL?t=check&repair=true
This performs a health check of the given file or directory, and if the
d.addErrback(self.explain_web_error)
return d
+
+ def test_deep_check(self):
+ self.basedir = "web/Grid/deep_check"
+ self.set_up_grid()
+ c0 = self.g.clients[0]
+ self.uris = {}
+ self.fileurls = {}
+ DATA = "data" * 100
+ d = c0.create_empty_dirnode()
+ def _stash_root_and_create_file(n):
+ self.rootnode = n
+ self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/"
+ return n.add_file(u"good", upload.Data(DATA, convergence=""))
+ d.addCallback(_stash_root_and_create_file)
+ def _stash_uri(fn, which):
+ self.uris[which] = fn.get_uri()
+ d.addCallback(_stash_uri, "good")
+ d.addCallback(lambda ign:
+ self.rootnode.add_file(u"small",
+ upload.Data("literal",
+ convergence="")))
+ d.addCallback(_stash_uri, "small")
+
+ d.addCallback(self.CHECK, "root", "t=stream-deep-check")
+ def _done(res):
+ units = [simplejson.loads(line)
+ for line in res.splitlines()
+ if line]
+ self.failUnlessEqual(len(units), 3+1)
+ # should be parent-first
+ u0 = units[0]
+ self.failUnlessEqual(u0["path"], [])
+ self.failUnlessEqual(u0["type"], "directory")
+ self.failUnlessEqual(u0["cap"], self.rootnode.get_uri())
+ u0cr = u0["check-results"]
+ self.failUnlessEqual(u0cr["results"]["count-shares-good"], 10)
+
+ ugood = [u for u in units
+ if u["type"] == "file" and u["path"] == [u"good"]][0]
+ self.failUnlessEqual(ugood["cap"], self.uris["good"])
+ ugoodcr = ugood["check-results"]
+ self.failUnlessEqual(ugoodcr["results"]["count-shares-good"], 10)
+
+ stats = units[-1]
+ self.failUnlessEqual(stats["type"], "stats")
+ s = stats["stats"]
+ self.failUnlessEqual(s["count-immutable-files"], 1)
+ self.failUnlessEqual(s["count-literal-files"], 1)
+ self.failUnlessEqual(s["count-directories"], 1)
+ d.addCallback(_done)
+
+ d.addErrback(self.explain_web_error)
+ return d
+
+ def test_deep_check_and_repair(self):
+ self.basedir = "web/Grid/deep_check_and_repair"
+ self.set_up_grid()
+ c0 = self.g.clients[0]
+ self.uris = {}
+ self.fileurls = {}
+ DATA = "data" * 100
+ d = c0.create_empty_dirnode()
+ def _stash_root_and_create_file(n):
+ self.rootnode = n
+ self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/"
+ return n.add_file(u"good", upload.Data(DATA, convergence=""))
+ d.addCallback(_stash_root_and_create_file)
+ def _stash_uri(fn, which):
+ self.uris[which] = fn.get_uri()
+ d.addCallback(_stash_uri, "good")
+ d.addCallback(lambda ign:
+ self.rootnode.add_file(u"small",
+ upload.Data("literal",
+ convergence="")))
+ d.addCallback(_stash_uri, "small")
+ d.addCallback(lambda ign:
+ self.rootnode.add_file(u"sick",
+ upload.Data(DATA+"1",
+ convergence="")))
+ d.addCallback(_stash_uri, "sick")
+ #d.addCallback(lambda ign:
+ # self.rootnode.add_file(u"dead",
+ # upload.Data(DATA+"2",
+ # convergence="")))
+ #d.addCallback(_stash_uri, "dead")
+
+ #d.addCallback(lambda ign: c0.create_mutable_file("mutable"))
+ #d.addCallback(lambda fn: self.rootnode.set_node(u"corrupt", fn))
+ #d.addCallback(_stash_uri, "corrupt")
+
+ def _clobber_shares(ignored):
+ good_shares = self.find_shares(self.uris["good"])
+ self.failUnlessEqual(len(good_shares), 10)
+ sick_shares = self.find_shares(self.uris["sick"])
+ os.unlink(sick_shares[0][2])
+ #dead_shares = self.find_shares(self.uris["dead"])
+ #for i in range(1, 10):
+ # os.unlink(dead_shares[i][2])
+
+ #c_shares = self.find_shares(self.uris["corrupt"])
+ #cso = CorruptShareOptions()
+ #cso.stdout = StringIO()
+ #cso.parseOptions([c_shares[0][2]])
+ #corrupt_share(cso)
+ d.addCallback(_clobber_shares)
+
+ d.addCallback(self.CHECK, "root", "t=stream-deep-check&repair=true")
+ def _done(res):
+ units = [simplejson.loads(line)
+ for line in res.splitlines()
+ if line]
+ self.failUnlessEqual(len(units), 4+1)
+ # should be parent-first
+ u0 = units[0]
+ self.failUnlessEqual(u0["path"], [])
+ self.failUnlessEqual(u0["type"], "directory")
+ self.failUnlessEqual(u0["cap"], self.rootnode.get_uri())
+ u0crr = u0["check-and-repair-results"]
+ self.failUnlessEqual(u0crr["repair-attempted"], False)
+ self.failUnlessEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10)
+
+ ugood = [u for u in units
+ if u["type"] == "file" and u["path"] == [u"good"]][0]
+ self.failUnlessEqual(ugood["cap"], self.uris["good"])
+ ugoodcrr = ugood["check-and-repair-results"]
+ self.failUnlessEqual(u0crr["repair-attempted"], False)
+ self.failUnlessEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10)
+
+ usick = [u for u in units
+ if u["type"] == "file" and u["path"] == [u"sick"]][0]
+ self.failUnlessEqual(usick["cap"], self.uris["sick"])
+ usickcrr = usick["check-and-repair-results"]
+ self.failUnlessEqual(usickcrr["repair-attempted"], True)
+ self.failUnlessEqual(usickcrr["repair-successful"], True)
+ self.failUnlessEqual(usickcrr["pre-repair-results"]["results"]["count-shares-good"], 9)
+ self.failUnlessEqual(usickcrr["post-repair-results"]["results"]["count-shares-good"], 10)
+
+ stats = units[-1]
+ self.failUnlessEqual(stats["type"], "stats")
+ s = stats["stats"]
+ self.failUnlessEqual(s["count-immutable-files"], 2)
+ self.failUnlessEqual(s["count-literal-files"], 1)
+ self.failUnlessEqual(s["count-directories"], 1)
+ d.addCallback(_done)
+
+ d.addErrback(self.explain_web_error)
+ return d
from allmydata.interfaces import ICheckAndRepairResults, ICheckResults
from allmydata.util import base32, idlib
+def json_check_counts(d):
+ r = {}
+ r["count-shares-good"] = d["count-shares-good"]
+ r["count-shares-needed"] = d["count-shares-needed"]
+ r["count-shares-expected"] = d["count-shares-expected"]
+ r["count-good-share-hosts"] = d["count-good-share-hosts"]
+ r["count-corrupt-shares"] = d["count-corrupt-shares"]
+ r["list-corrupt-shares"] = [ (idlib.nodeid_b2a(serverid),
+ base32.b2a(si), shnum)
+ for (serverid, si, shnum)
+ in d["list-corrupt-shares"] ]
+ r["servers-responding"] = [idlib.nodeid_b2a(serverid)
+ for serverid in d["servers-responding"]]
+ sharemap = {}
+ for (shareid, serverids) in d["sharemap"].items():
+ sharemap[shareid] = [idlib.nodeid_b2a(serverid)
+ for serverid in serverids]
+ r["sharemap"] = sharemap
+
+ r["count-wrong-shares"] = d["count-wrong-shares"]
+ r["count-recoverable-versions"] = d["count-recoverable-versions"]
+ r["count-unrecoverable-versions"] = d["count-unrecoverable-versions"]
+
+ return r
+
+def json_check_results(r):
+ if r is None:
+ # LIT file
+ data = {"storage-index": "",
+ "results": {"healthy": True},
+ }
+ return data
+ data = {}
+ data["storage-index"] = r.get_storage_index_string()
+ data["summary"] = r.get_summary()
+ data["results"] = json_check_counts(r.get_data())
+ data["results"]["needs-rebalancing"] = r.needs_rebalancing()
+ data["results"]["healthy"] = r.is_healthy()
+ data["results"]["recoverable"] = r.is_recoverable()
+ return data
+
+def json_check_and_repair_results(r):
+ if r is None:
+ # LIT file
+ data = {"storage-index": "",
+ "repair-attempted": False,
+ }
+ return data
+ data = {}
+ data["storage-index"] = r.get_storage_index_string()
+ data["repair-attempted"] = r.get_repair_attempted()
+ data["repair-successful"] = r.get_repair_successful()
+ pre = r.get_pre_repair_results()
+ data["pre-repair-results"] = json_check_results(pre)
+ post = r.get_post_repair_results()
+ data["post-repair-results"] = json_check_results(post)
+ return data
+
class ResultsBase:
def _join_pathstring(self, path):
if path:
return T.ul[r]
- def _json_check_and_repair_results(self, r):
- data = {}
- data["storage-index"] = r.get_storage_index_string()
- data["repair-attempted"] = r.get_repair_attempted()
- data["repair-successful"] = r.get_repair_successful()
- pre = r.get_pre_repair_results()
- data["pre-repair-results"] = self._json_check_results(pre)
- post = r.get_post_repair_results()
- data["post-repair-results"] = self._json_check_results(post)
- return data
-
- def _json_check_results(self, r):
- data = {}
- data["storage-index"] = r.get_storage_index_string()
- data["summary"] = r.get_summary()
- data["results"] = self._json_check_counts(r.get_data())
- data["results"]["needs-rebalancing"] = r.needs_rebalancing()
- data["results"]["healthy"] = r.is_healthy()
- data["results"]["recoverable"] = r.is_recoverable()
- return data
-
- def _json_check_counts(self, d):
- r = {}
- r["count-shares-good"] = d["count-shares-good"]
- r["count-shares-needed"] = d["count-shares-needed"]
- r["count-shares-expected"] = d["count-shares-expected"]
- r["count-good-share-hosts"] = d["count-good-share-hosts"]
- r["count-corrupt-shares"] = d["count-corrupt-shares"]
- r["list-corrupt-shares"] = [ (idlib.nodeid_b2a(serverid),
- base32.b2a(si), shnum)
- for (serverid, si, shnum)
- in d["list-corrupt-shares"] ]
- r["servers-responding"] = [idlib.nodeid_b2a(serverid)
- for serverid in d["servers-responding"]]
- sharemap = {}
- for (shareid, serverids) in d["sharemap"].items():
- sharemap[shareid] = [idlib.nodeid_b2a(serverid)
- for serverid in serverids]
- r["sharemap"] = sharemap
-
- r["count-wrong-shares"] = d["count-wrong-shares"]
- r["count-recoverable-versions"] = d["count-recoverable-versions"]
- r["count-unrecoverable-versions"] = d["count-unrecoverable-versions"]
-
- return r
-
def _html(self, s):
if isinstance(s, (str, unicode)):
return html.escape(s)
def json(self, ctx):
inevow.IRequest(ctx).setHeader("content-type", "text/plain")
- data = self._json_check_results(self.r)
+ data = json_check_results(self.r)
return simplejson.dumps(data, indent=1) + "\n"
def render_summary(self, ctx, data):
def json(self, ctx):
inevow.IRequest(ctx).setHeader("content-type", "text/plain")
- data = self._json_check_and_repair_results(self.r)
+ data = json_check_and_repair_results(self.r)
return simplejson.dumps(data, indent=1) + "\n"
def render_summary(self, ctx, data):
shnum)
for (serverid, storage_index, shnum)
in res.get_corrupt_shares() ]
- data["list-unhealthy-files"] = [ (path_t, self._json_check_results(r))
+ data["list-unhealthy-files"] = [ (path_t, json_check_results(r))
for (path_t, r)
in res.get_all_results().items()
if not r.is_healthy() ]
data["list-remaining-corrupt-shares"] = remaining_corrupt
unhealthy = [ (path_t,
- self._json_check_results(crr.get_pre_repair_results()))
+ json_check_results(crr.get_pre_repair_results()))
for (path_t, crr)
in res.get_all_results().items()
if not crr.get_pre_repair_results().is_healthy() ]
CheckAndRepairResults, DeepCheckResults, DeepCheckAndRepairResults
from allmydata.web.info import MoreInfo
from allmydata.web.operations import ReloadMixin
+from allmydata.web.check_results import json_check_results, \
+ json_check_and_repair_results
class BlockingFileError(Exception):
# TODO: catch and transform
d = self._POST_check(req)
elif t == "start-deep-check":
d = self._POST_start_deep_check(ctx)
+ elif t == "stream-deep-check":
+ d = self._POST_stream_deep_check(ctx)
elif t == "start-manifest":
d = self._POST_start_manifest(ctx)
elif t == "start-deep-size":
renderer = DeepCheckResults(monitor)
return self._start_operation(monitor, renderer, ctx)
+ def _POST_stream_deep_check(self, ctx):
+ verify = boolean_of_arg(get_arg(ctx, "verify", "false"))
+ repair = boolean_of_arg(get_arg(ctx, "repair", "false"))
+ walker = DeepCheckStreamer(ctx, self.node, verify, repair)
+ monitor = self.node.deep_traverse(walker)
+ walker.setMonitor(monitor)
+ # register to hear stopProducing. The walker ignores pauseProducing.
+ IRequest(ctx).registerProducer(walker, True)
+ d = monitor.when_done()
+ def _done(res):
+ IRequest(ctx).unregisterProducer()
+ return res
+ d.addBoth(_done)
+ def _cancelled(f):
+ f.trap(OperationCancelledError)
+ return "Operation Cancelled"
+ d.addErrback(_cancelled)
+ return d
+
def _POST_start_manifest(self, ctx):
if not get_arg(ctx, "ophandle"):
raise NeedOperationHandleError("slow operation requires ophandle=")
assert "\n" not in j
self.req.write(j+"\n")
return ""
+
+class DeepCheckStreamer(dirnode.DeepStats):
+ implements(IPushProducer)
+
+ def __init__(self, ctx, origin, verify, repair):
+ dirnode.DeepStats.__init__(self, origin)
+ self.req = IRequest(ctx)
+ self.verify = verify
+ self.repair = repair
+
+ def setMonitor(self, monitor):
+ self.monitor = monitor
+ def pauseProducing(self):
+ pass
+ def resumeProducing(self):
+ pass
+ def stopProducing(self):
+ self.monitor.cancel()
+
+ def add_node(self, node, path):
+ dirnode.DeepStats.add_node(self, node, path)
+ data = {"path": path,
+ "cap": node.get_uri()}
+
+ if IDirectoryNode.providedBy(node):
+ data["type"] = "directory"
+ else:
+ data["type"] = "file"
+
+ v = node.get_verify_cap()
+ if v:
+ v = v.to_string()
+ data["verifycap"] = v
+
+ r = node.get_repair_cap()
+ if r:
+ r = r.to_string()
+ data["repaircap"] = r
+
+ si = node.get_storage_index()
+ if si:
+ si = base32.b2a(si)
+ data["storage-index"] = si
+
+ if self.repair:
+ d = node.check_and_repair(self.monitor, self.verify)
+ d.addCallback(self.add_check_and_repair, data)
+ else:
+ d = node.check(self.monitor, self.verify)
+ d.addCallback(self.add_check, data)
+ d.addCallback(self.write_line)
+ return d
+
+ def add_check_and_repair(self, crr, data):
+ data["check-and-repair-results"] = json_check_and_repair_results(crr)
+ return data
+
+ def add_check(self, cr, data):
+ data["check-results"] = json_check_results(cr)
+ return data
+
+ def write_line(self, data):
+ j = simplejson.dumps(data, ensure_ascii=True)
+ assert "\n" not in j
+ self.req.write(j+"\n")
+
+ def finish(self):
+ stats = dirnode.DeepStats.get_results(self)
+ d = {"type": "stats",
+ "stats": stats,
+ }
+ j = simplejson.dumps(d, ensure_ascii=True)
+ assert "\n" not in j
+ self.req.write(j+"\n")
+ return ""