From 476a5c8fac9909d53bd3b5b93eb2f2903f347b62 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Mon, 16 Feb 2009 23:35:53 -0700
Subject: [PATCH] webapi #590: add streaming deep-check. Still need a CLI tool
 to use it.

---
 docs/frontends/webapi.txt          |  39 ++++++++
 src/allmydata/test/test_web.py     | 147 +++++++++++++++++++++++++++++
 src/allmydata/web/check_results.py | 112 ++++++++++++----------
 src/allmydata/web/directory.py     |  98 +++++++++++++++++++
 4 files changed, 346 insertions(+), 50 deletions(-)

diff --git a/docs/frontends/webapi.txt b/docs/frontends/webapi.txt
index 46eaf77f..0caa1eaa 100644
--- a/docs/frontends/webapi.txt
+++ b/docs/frontends/webapi.txt
@@ -885,6 +885,45 @@ POST $URL?t=start-deep-check    (must add &ophandle=XYZ)
    stats: a dictionary with the same keys as the t=start-deep-stats command
           (described below)
 
+POST $URL?t=stream-deep-check
+
+ This initiates a recursive walk of all files and directories reachable from
+ the target, performing a check on each one just like t=check. For each
+ unique object (duplicates are skipped), a single line of JSON is emitted to
+ the HTTP response channel. When the walk is complete, a final line of JSON
+ is emitted which contains the accumulated file-size/count "deep-stats" data.
+
+ This command takes the same arguments as t=start-deep-check.
+
+ A CLI tool can split the response stream on newlines into "response units",
+ and parse each response unit as JSON. Each such parsed unit will be a
+ dictionary, and will contain at least the "type" key: a string, one of
+ "file", "directory", or "stats".
+
+ For all units that have a type of "file" or "directory", the dictionary will
+ contain the following keys:
+
+  "path": a list of strings, with the path that is traversed to reach the
+          object
+  "cap": a writecap for the file or directory, if available, else a readcap
+  "verifycap": a verifycap for the file or directory
+  "repaircap": the weakest cap which can still be used to repair the object
+  "storage-index": a base32 storage index for the object
+  "check-results": a copy of the dictionary which would be returned by
+                   t=check&output=json, with three top-level keys:
+                   "storage-index", "summary", and "results", and a variety
+                   of counts and sharemaps in the "results" value.
+
+ Note that non-distributed files (i.e. LIT files) will have values of None
+ for verifycap, repaircap, and storage-index, since these files can neither
+ be verified nor repaired, and are not stored on the storage servers.
+ Likewise the check-results dictionary will be limited: an empty string for
+ storage-index, and a results dictionary with only the "healthy" key.
+
+ The last unit in the stream will have a type of "stats", and will contain
+ the keys described in the "start-deep-stats" operation, below.
+
+
 POST $URL?t=check&repair=true
 
   This performs a health check of the given file or directory, and if the
diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py
index 68319c09..a749e0c5 100644
--- a/src/allmydata/test/test_web.py
+++ b/src/allmydata/test/test_web.py
@@ -2793,3 +2793,150 @@ class Grid(GridTestMixin, WebErrorMixin, unittest.TestCase):
 
         d.addErrback(self.explain_web_error)
         return d
+
+    def test_deep_check(self):
+        self.basedir = "web/Grid/deep_check"
+        self.set_up_grid()
+        c0 = self.g.clients[0]
+        self.uris = {}
+        self.fileurls = {}
+        DATA = "data" * 100
+        d = c0.create_empty_dirnode()
+        def _stash_root_and_create_file(n):
+            self.rootnode = n
+            self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/"
+            return n.add_file(u"good", upload.Data(DATA, convergence=""))
+        d.addCallback(_stash_root_and_create_file)
+        def _stash_uri(fn, which):
+            self.uris[which] = fn.get_uri()
+        d.addCallback(_stash_uri, "good")
+        d.addCallback(lambda ign:
+                      self.rootnode.add_file(u"small",
+                                             upload.Data("literal",
+                                                        convergence="")))
+        d.addCallback(_stash_uri, "small")
+
+        d.addCallback(self.CHECK, "root", "t=stream-deep-check")
+        def _done(res):
+            units = [simplejson.loads(line)
+                     for line in res.splitlines()
+                     if line]
+            self.failUnlessEqual(len(units), 3+1)
+            # should be parent-first
+            u0 = units[0]
+            self.failUnlessEqual(u0["path"], [])
+            self.failUnlessEqual(u0["type"], "directory")
+            self.failUnlessEqual(u0["cap"], self.rootnode.get_uri())
+            u0cr = u0["check-results"]
+            self.failUnlessEqual(u0cr["results"]["count-shares-good"], 10)
+
+            ugood = [u for u in units
+                     if u["type"] == "file" and u["path"] == [u"good"]][0]
+            self.failUnlessEqual(ugood["cap"], self.uris["good"])
+            ugoodcr = ugood["check-results"]
+            self.failUnlessEqual(ugoodcr["results"]["count-shares-good"], 10)
+
+            stats = units[-1]
+            self.failUnlessEqual(stats["type"], "stats")
+            s = stats["stats"]
+            self.failUnlessEqual(s["count-immutable-files"], 1)
+            self.failUnlessEqual(s["count-literal-files"], 1)
+            self.failUnlessEqual(s["count-directories"], 1)
+        d.addCallback(_done)
+
+        d.addErrback(self.explain_web_error)
+        return d
+
+    def test_deep_check_and_repair(self):
+        self.basedir = "web/Grid/deep_check_and_repair"
+        self.set_up_grid()
+        c0 = self.g.clients[0]
+        self.uris = {}
+        self.fileurls = {}
+        DATA = "data" * 100
+        d = c0.create_empty_dirnode()
+        def _stash_root_and_create_file(n):
+            self.rootnode = n
+            self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/"
+            return n.add_file(u"good", upload.Data(DATA, convergence=""))
+        d.addCallback(_stash_root_and_create_file)
+        def _stash_uri(fn, which):
+            self.uris[which] = fn.get_uri()
+        d.addCallback(_stash_uri, "good")
+        d.addCallback(lambda ign:
+                      self.rootnode.add_file(u"small",
+                                             upload.Data("literal",
+                                                        convergence="")))
+        d.addCallback(_stash_uri, "small")
+        d.addCallback(lambda ign:
+                      self.rootnode.add_file(u"sick",
+                                             upload.Data(DATA+"1",
+                                                        convergence="")))
+        d.addCallback(_stash_uri, "sick")
+        #d.addCallback(lambda ign:
+        #              self.rootnode.add_file(u"dead",
+        #                                     upload.Data(DATA+"2",
+        #                                                convergence="")))
+        #d.addCallback(_stash_uri, "dead")
+
+        #d.addCallback(lambda ign: c0.create_mutable_file("mutable"))
+        #d.addCallback(lambda fn: self.rootnode.set_node(u"corrupt", fn))
+        #d.addCallback(_stash_uri, "corrupt")
+
+        def _clobber_shares(ignored):
+            good_shares = self.find_shares(self.uris["good"])
+            self.failUnlessEqual(len(good_shares), 10)
+            sick_shares = self.find_shares(self.uris["sick"])
+            os.unlink(sick_shares[0][2])
+            #dead_shares = self.find_shares(self.uris["dead"])
+            #for i in range(1, 10):
+            #    os.unlink(dead_shares[i][2])
+
+            #c_shares = self.find_shares(self.uris["corrupt"])
+            #cso = CorruptShareOptions()
+            #cso.stdout = StringIO()
+            #cso.parseOptions([c_shares[0][2]])
+            #corrupt_share(cso)
+        d.addCallback(_clobber_shares)
+
+        d.addCallback(self.CHECK, "root", "t=stream-deep-check&repair=true")
+        def _done(res):
+            units = [simplejson.loads(line)
+                     for line in res.splitlines()
+                     if line]
+            self.failUnlessEqual(len(units), 4+1)
+            # should be parent-first
+            u0 = units[0]
+            self.failUnlessEqual(u0["path"], [])
+            self.failUnlessEqual(u0["type"], "directory")
+            self.failUnlessEqual(u0["cap"], self.rootnode.get_uri())
+            u0crr = u0["check-and-repair-results"]
+            self.failUnlessEqual(u0crr["repair-attempted"], False)
+            self.failUnlessEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10)
+
+            ugood = [u for u in units
+                     if u["type"] == "file" and u["path"] == [u"good"]][0]
+            self.failUnlessEqual(ugood["cap"], self.uris["good"])
+            ugoodcrr = ugood["check-and-repair-results"]
+            self.failUnlessEqual(u0crr["repair-attempted"], False)
+            self.failUnlessEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10)
+
+            usick = [u for u in units
+                     if u["type"] == "file" and u["path"] == [u"sick"]][0]
+            self.failUnlessEqual(usick["cap"], self.uris["sick"])
+            usickcrr = usick["check-and-repair-results"]
+            self.failUnlessEqual(usickcrr["repair-attempted"], True)
+            self.failUnlessEqual(usickcrr["repair-successful"], True)
+            self.failUnlessEqual(usickcrr["pre-repair-results"]["results"]["count-shares-good"], 9)
+            self.failUnlessEqual(usickcrr["post-repair-results"]["results"]["count-shares-good"], 10)
+
+            stats = units[-1]
+            self.failUnlessEqual(stats["type"], "stats")
+            s = stats["stats"]
+            self.failUnlessEqual(s["count-immutable-files"], 2)
+            self.failUnlessEqual(s["count-literal-files"], 1)
+            self.failUnlessEqual(s["count-directories"], 1)
+        d.addCallback(_done)
+
+        d.addErrback(self.explain_web_error)
+        return d
diff --git a/src/allmydata/web/check_results.py b/src/allmydata/web/check_results.py
index d8af3648..bf7d75db 100644
--- a/src/allmydata/web/check_results.py
+++ b/src/allmydata/web/check_results.py
@@ -9,6 +9,64 @@ from allmydata.web.operations import ReloadMixin
 from allmydata.interfaces import ICheckAndRepairResults, ICheckResults
 from allmydata.util import base32, idlib
 
+def json_check_counts(d):
+    r = {}
+    r["count-shares-good"] = d["count-shares-good"]
+    r["count-shares-needed"] = d["count-shares-needed"]
+    r["count-shares-expected"] = d["count-shares-expected"]
+    r["count-good-share-hosts"] = d["count-good-share-hosts"]
+    r["count-corrupt-shares"] = d["count-corrupt-shares"]
+    r["list-corrupt-shares"] = [ (idlib.nodeid_b2a(serverid),
+                                  base32.b2a(si), shnum)
+                                 for (serverid, si, shnum)
+                                 in d["list-corrupt-shares"] ]
+    r["servers-responding"] = [idlib.nodeid_b2a(serverid)
+                               for serverid in d["servers-responding"]]
+    sharemap = {}
+    for (shareid, serverids) in d["sharemap"].items():
+        sharemap[shareid] = [idlib.nodeid_b2a(serverid)
+                             for serverid in serverids]
+    r["sharemap"] = sharemap
+
+    r["count-wrong-shares"] = d["count-wrong-shares"]
+    r["count-recoverable-versions"] = d["count-recoverable-versions"]
+    r["count-unrecoverable-versions"] = d["count-unrecoverable-versions"]
+
+    return r
+
+def json_check_results(r):
+    if r is None:
+        # LIT file
+        data = {"storage-index": "",
+                "results": {"healthy": True},
+                }
+        return data
+    data = {}
+    data["storage-index"] = r.get_storage_index_string()
+    data["summary"] = r.get_summary()
+    data["results"] = json_check_counts(r.get_data())
+    data["results"]["needs-rebalancing"] = r.needs_rebalancing()
+    data["results"]["healthy"] = r.is_healthy()
+    data["results"]["recoverable"] = r.is_recoverable()
+    return data
+
+def json_check_and_repair_results(r):
+    if r is None:
+        # LIT file
+        data = {"storage-index": "",
+                "repair-attempted": False,
+                }
+        return data
+    data = {}
+    data["storage-index"] = r.get_storage_index_string()
+    data["repair-attempted"] = r.get_repair_attempted()
+    data["repair-successful"] = r.get_repair_successful()
+    pre = r.get_pre_repair_results()
+    data["pre-repair-results"] = json_check_results(pre)
+    post = r.get_post_repair_results()
+    data["post-repair-results"] = json_check_results(post)
+    return data
+
 class ResultsBase:
     def _join_pathstring(self, path):
         if path:
@@ -94,52 +152,6 @@ class ResultsBase:
 
         return T.ul[r]
 
-    def _json_check_and_repair_results(self, r):
-        data = {}
-        data["storage-index"] = r.get_storage_index_string()
-        data["repair-attempted"] = r.get_repair_attempted()
-        data["repair-successful"] = r.get_repair_successful()
-        pre = r.get_pre_repair_results()
-        data["pre-repair-results"] = self._json_check_results(pre)
-        post = r.get_post_repair_results()
-        data["post-repair-results"] = self._json_check_results(post)
-        return data
-
-    def _json_check_results(self, r):
-        data = {}
-        data["storage-index"] = r.get_storage_index_string()
-        data["summary"] = r.get_summary()
-        data["results"] = self._json_check_counts(r.get_data())
-        data["results"]["needs-rebalancing"] = r.needs_rebalancing()
-        data["results"]["healthy"] = r.is_healthy()
-        data["results"]["recoverable"] = r.is_recoverable()
-        return data
-
-    def _json_check_counts(self, d):
-        r = {}
-        r["count-shares-good"] = d["count-shares-good"]
-        r["count-shares-needed"] = d["count-shares-needed"]
-        r["count-shares-expected"] = d["count-shares-expected"]
-        r["count-good-share-hosts"] = d["count-good-share-hosts"]
-        r["count-corrupt-shares"] = d["count-corrupt-shares"]
-        r["list-corrupt-shares"] = [ (idlib.nodeid_b2a(serverid),
-                                      base32.b2a(si), shnum)
-                                     for (serverid, si, shnum)
-                                     in d["list-corrupt-shares"] ]
-        r["servers-responding"] = [idlib.nodeid_b2a(serverid)
-                                   for serverid in d["servers-responding"]]
-        sharemap = {}
-        for (shareid, serverids) in d["sharemap"].items():
-            sharemap[shareid] = [idlib.nodeid_b2a(serverid)
-                                 for serverid in serverids]
-        r["sharemap"] = sharemap
-
-        r["count-wrong-shares"] = d["count-wrong-shares"]
-        r["count-recoverable-versions"] = d["count-recoverable-versions"]
-        r["count-unrecoverable-versions"] = d["count-unrecoverable-versions"]
-
-        return r
-
     def _html(self, s):
         if isinstance(s, (str, unicode)):
             return html.escape(s)
@@ -210,7 +222,7 @@ class CheckResults(CheckerBase, rend.Page, ResultsBase):
 
     def json(self, ctx):
         inevow.IRequest(ctx).setHeader("content-type", "text/plain")
-        data = self._json_check_results(self.r)
+        data = json_check_results(self.r)
         return simplejson.dumps(data, indent=1) + "\n"
 
     def render_summary(self, ctx, data):
@@ -249,7 +261,7 @@ class CheckAndRepairResults(CheckerBase, rend.Page, ResultsBase):
 
     def json(self, ctx):
         inevow.IRequest(ctx).setHeader("content-type", "text/plain")
-        data = self._json_check_and_repair_results(self.r)
+        data = json_check_and_repair_results(self.r)
         return simplejson.dumps(data, indent=1) + "\n"
 
     def render_summary(self, ctx, data):
@@ -324,7 +336,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin):
                                          shnum)
                                         for (serverid, storage_index, shnum)
                                         in res.get_corrupt_shares() ]
-        data["list-unhealthy-files"] = [ (path_t, self._json_check_results(r))
+        data["list-unhealthy-files"] = [ (path_t, json_check_results(r))
                                          for (path_t, r)
                                          in res.get_all_results().items()
                                          if not r.is_healthy() ]
@@ -496,7 +508,7 @@ class DeepCheckAndRepairResults(rend.Page, ResultsBase, ReloadMixin):
         data["list-remaining-corrupt-shares"] = remaining_corrupt
 
         unhealthy = [ (path_t,
-                       self._json_check_results(crr.get_pre_repair_results()))
+                       json_check_results(crr.get_pre_repair_results()))
                       for (path_t, crr)
                       in res.get_all_results().items()
                       if not crr.get_pre_repair_results().is_healthy() ]
diff --git a/src/allmydata/web/directory.py b/src/allmydata/web/directory.py
index 2f00f088..367ed9e4 100644
--- a/src/allmydata/web/directory.py
+++ b/src/allmydata/web/directory.py
@@ -30,6 +30,8 @@ from allmydata.web.check_results import CheckResults, \
      CheckAndRepairResults, DeepCheckResults, DeepCheckAndRepairResults
 from allmydata.web.info import MoreInfo
 from allmydata.web.operations import ReloadMixin
+from allmydata.web.check_results import json_check_results, \
+     json_check_and_repair_results
 
 class BlockingFileError(Exception):
     # TODO: catch and transform
@@ -189,6 +191,8 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
             d = self._POST_check(req)
         elif t == "start-deep-check":
             d = self._POST_start_deep_check(ctx)
+        elif t == "stream-deep-check":
+            d = self._POST_stream_deep_check(ctx)
         elif t == "start-manifest":
             d = self._POST_start_manifest(ctx)
         elif t == "start-deep-size":
@@ -378,6 +382,25 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
             renderer = DeepCheckResults(monitor)
         return self._start_operation(monitor, renderer, ctx)
 
+    def _POST_stream_deep_check(self, ctx):
+        verify = boolean_of_arg(get_arg(ctx, "verify", "false"))
+        repair = boolean_of_arg(get_arg(ctx, "repair", "false"))
+        walker = DeepCheckStreamer(ctx, self.node, verify, repair)
+        monitor = self.node.deep_traverse(walker)
+        walker.setMonitor(monitor)
+        # register to hear stopProducing. The walker ignores pauseProducing.
+        IRequest(ctx).registerProducer(walker, True)
+        d = monitor.when_done()
+        def _done(res):
+            IRequest(ctx).unregisterProducer()
+            return res
+        d.addBoth(_done)
+        def _cancelled(f):
+            f.trap(OperationCancelledError)
+            return "Operation Cancelled"
+        d.addErrback(_cancelled)
+        return d
+
     def _POST_start_manifest(self, ctx):
         if not get_arg(ctx, "ophandle"):
             raise NeedOperationHandleError("slow operation requires ophandle=")
@@ -903,3 +926,78 @@ class ManifestStreamer(dirnode.DeepStats):
         assert "\n" not in j
         self.req.write(j+"\n")
         return ""
+
+class DeepCheckStreamer(dirnode.DeepStats):
+    implements(IPushProducer)
+
+    def __init__(self, ctx, origin, verify, repair):
+        dirnode.DeepStats.__init__(self, origin)
+        self.req = IRequest(ctx)
+        self.verify = verify
+        self.repair = repair
+
+    def setMonitor(self, monitor):
+        self.monitor = monitor
+    def pauseProducing(self):
+        pass
+    def resumeProducing(self):
+        pass
+    def stopProducing(self):
+        self.monitor.cancel()
+
+    def add_node(self, node, path):
+        dirnode.DeepStats.add_node(self, node, path)
+        data = {"path": path,
+                "cap": node.get_uri()}
+
+        if IDirectoryNode.providedBy(node):
+            data["type"] = "directory"
+        else:
+            data["type"] = "file"
+
+        v = node.get_verify_cap()
+        if v:
+            v = v.to_string()
+        data["verifycap"] = v
+
+        r = node.get_repair_cap()
+        if r:
+            r = r.to_string()
+        data["repaircap"] = r
+
+        si = node.get_storage_index()
+        if si:
+            si = base32.b2a(si)
+        data["storage-index"] = si
+
+        if self.repair:
+            d = node.check_and_repair(self.monitor, self.verify)
+            d.addCallback(self.add_check_and_repair, data)
+        else:
+            d = node.check(self.monitor, self.verify)
+            d.addCallback(self.add_check, data)
+        d.addCallback(self.write_line)
+        return d
+
+    def add_check_and_repair(self, crr, data):
+        data["check-and-repair-results"] = json_check_and_repair_results(crr)
+        return data
+
+    def add_check(self, cr, data):
+        data["check-results"] = json_check_results(cr)
+        return data
+
+    def write_line(self, data):
+        j = simplejson.dumps(data, ensure_ascii=True)
+        assert "\n" not in j
+        self.req.write(j+"\n")
+
+    def finish(self):
+        stats = dirnode.DeepStats.get_results(self)
+        d = {"type": "stats",
+             "stats": stats,
+             }
+        j = simplejson.dumps(d, ensure_ascii=True)
+        assert "\n" not in j
+        self.req.write(j+"\n")
+        return ""
-- 
2.45.2