From: Brian Warner <>
Date: Fri, 23 Jan 2009 05:01:36 +0000 (-0700)
Subject: #590: add webish t=stream-manifest
X-Git-Tag: allmydata-tahoe-1.3.0~156

#590: add webish t=stream-manifest

diff --git a/docs/frontends/webapi.txt b/docs/frontends/webapi.txt
index 8cc9288f..86da8807 100644
--- a/docs/frontends/webapi.txt
+++ b/docs/frontends/webapi.txt
@@ -257,6 +257,13 @@ If a retain-for= argument is not used, the default handle lifetimes are:
  * collected handles (i.e. the GET page has been retrieved at least once
    since the operation completed) will remain valid for ten minutes.
+Many "slow" operations can begin to use unacceptable amounts of memory when
+operation on large directory structures. The memory usage increases when the
+ophandle is polled, as the results must be copied into a JSON string, sent
+over the wire, then parsed by a client. So, as an alternative, many "slow"
+operations have streaming equivalents. These equivalents do not use operation
+handles. Instead, they emit line-oriented status results immediately. Client
+code can cancel the operation by simply closing the HTTP connection.
 == Programmatic Operations ==
@@ -1043,6 +1050,37 @@ POST $DIRURL?t=start-deep-stats    (must add &ophandle=XYZ)
    share management data (leases)
    backend (ext3) minimum block size
+POST $URL?t=stream-manifest
+ This operation performs a recursive walk of all files and directories
+ reachable from the given starting point. For each such unique object
+ (duplicates are skipped), a single line of JSON is emitted to the HTTP
+ response channel. When the walk is complete, a final line of JSON is emitted
+ which contains the accumulated file-size/count "deep-stats" data.
+ A CLI tool can split the response stream on newlines into "response units",
+ and parse each response unit as JSON. Each such parsed unit will be a
+ dictionary, and will contain at least the "type" key: a string, one of
+ "file", "directory", or "stats".
+ For all units that have a type of "file" or "directory", the dictionary will
+ contain the following keys:
+  "path": a list of strings, with the path that is traversed to reach the
+          object
+  "cap": a writecap for the file or directory, if available, else a readcap
+  "verifycap": a verifycap for the file or directory
+  "repaircap": the weakest cap which can still be used to repair the object
+  "storage-index": a base32 storage index for the object
+ Note that non-distributed files (i.e. LIT files) will have values of None
+ for verifycap, repaircap, and storage-index, since these files can neither
+ be verified nor repaired, and are not stored on the storage servers.
+ The last unit in the stream will have a type of "stats", and will contain
+ the keys described in the "start-deep-stats" operation, below.
 == Other Useful Pages ==
 The portion of the web namespace that begins with "/uri" (and "/named") is
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index 747bd74c..5c40ba3e 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -48,6 +48,8 @@ class FakeCHKFileNode:
         return self.my_uri.to_string()
     def get_verify_cap(self):
         return self.my_uri.get_verify_cap()
+    def get_repair_cap(self):
+        return self.my_uri.get_verify_cap()
     def get_storage_index(self):
         return self.storage_index
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index 7a7e9333..d192bcb7 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -1937,6 +1937,13 @@ class DeepCheckBase(SystemTestMixin, ErrorMixin):
   "%s: not JSON: '%s'" % (url, s))
         return data
+    def parse_streamed_json(self, s):
+        for unit in s.split("\n"):
+            if not unit:
+                # stream should end with a newline, so split returns ""
+                continue
+            yield simplejson.loads(unit)
     def web(self, n, method="GET", **kwargs):
         # returns (data, url)
         url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
@@ -2100,6 +2107,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
         d = self.set_up_nodes()
+        d.addCallback(self.do_web_stream_manifest)
@@ -2136,6 +2144,45 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
         self.failUnlessEqual(s["size-immutable-files"], 13000)
         self.failUnlessEqual(s["size-literal-files"], 48)
+    def do_web_stream_manifest(self, ignored):
+        d = self.web(self.root, method="POST", t="stream-manifest")
+        def _check((res,url)):
+            units = list(self.parse_streamed_json(res))
+            files = [u for u in units if u["type"] in ("file", "directory")]
+            assert units[-1]["type"] == "stats"
+            stats = units[-1]["stats"]
+            self.failUnlessEqual(len(files), 5)
+            # [root,mutable,large] are distributed, [small,small2] are not
+            self.failUnlessEqual(len([f for f in files
+                                      if f["verifycap"] is not None]), 3)
+            self.failUnlessEqual(len([f for f in files
+                                      if f["verifycap"] is None]), 2)
+            self.failUnlessEqual(len([f for f in files
+                                      if f["repaircap"] is not None]), 3)
+            self.failUnlessEqual(len([f for f in files
+                                      if f["repaircap"] is None]), 2)
+            self.failUnlessEqual(len([f for f in files
+                                      if f["storage-index"] is not None]), 3)
+            self.failUnlessEqual(len([f for f in files
+                                      if f["storage-index"] is None]), 2)
+            # make sure that a mutable file has filecap==repaircap!=verifycap
+            mutable = [f for f in files
+                       if f["cap"] is not None
+                       and f["cap"].startswith("URI:SSK:")][0]
+            self.failUnlessEqual(mutable["cap"], self.mutable_uri)
+            self.failIfEqual(mutable["cap"], mutable["verifycap"])
+            self.failUnlessEqual(mutable["cap"], mutable["repaircap"])
+            # for immutable file, verifycap==repaircap!=filecap
+            large = [f for f in files
+                       if f["cap"] is not None
+                       and f["cap"].startswith("URI:CHK:")][0]
+            self.failUnlessEqual(large["cap"], self.large_uri)
+            self.failIfEqual(large["cap"], large["verifycap"])
+            self.failUnlessEqual(large["verifycap"], large["repaircap"])
+            self.check_stats_good(stats)
+        d.addCallback(_check)
+        return d
     def do_test_check_good(self, ignored):
         d = defer.succeed(None)
         # check the individual items
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index 2711a0e6..5967e41d 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -168,6 +168,7 @@ class WebMixin(object):
             foo.set_uri(unicode_filename, self._bar_txt_uri)
             _ign, n, baz_file = self.makefile(2)
+            self._baz_file_uri = baz_file
             sub.set_uri(u"baz.txt", baz_file)
             _ign, n, self._bad_file_uri = self.makefile(3)
@@ -1050,6 +1051,26 @@ class Web(WebMixin, testutil.StallMixin, unittest.TestCase):
         return d
+    def test_POST_DIRURL_stream_manifest(self):
+        d = self.POST(self.public_url + "/foo/?t=stream-manifest")
+        def _check(res):
+            self.failUnless(res.endswith("\n"))
+            units = [simplejson.loads(t) for t in res[:-1].split("\n")]
+            self.failUnlessEqual(len(units), 7)
+            self.failUnlessEqual(units[-1]["type"], "stats")
+            first = units[0]
+            self.failUnlessEqual(first["path"], [])
+            self.failUnlessEqual(first["cap"], self._foo_uri)
+            self.failUnlessEqual(first["type"], "directory")
+            baz = [u for u in units[:-1] if u["cap"] == self._baz_file_uri][0]
+            self.failUnlessEqual(baz["path"], ["sub", "baz.txt"])
+            self.failIfEqual(baz["storage-index"], None)
+            self.failIfEqual(baz["verifycap"], None)
+            self.failIfEqual(baz["repaircap"], None)
+            return
+        d.addCallback(_check)
+        return d
     def test_GET_DIRURL_uri(self):
         d = self.GET(self.public_url + "/foo?t=uri")
         def _check(res):
diff --git a/src/allmydata/web/ b/src/allmydata/web/
index a76f7090..27f8ac9c 100644
--- a/src/allmydata/web/
+++ b/src/allmydata/web/
@@ -16,6 +16,7 @@ from allmydata.uri import from_string_dirnode
 from allmydata.interfaces import IDirectoryNode, IFileNode, IMutableFileNode, \
      ExistingChildError, NoSuchChildError
 from allmydata.monitor import Monitor
+from allmydata import dirnode
 from allmydata.web.common import text_plain, WebError, \
      IClient, IOpHandleTable, NeedOperationHandleError, \
      boolean_of_arg, get_arg, get_root, \
@@ -192,6 +193,8 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
             d = self._POST_start_deep_size(ctx)
         elif t == "start-deep-stats":
             d = self._POST_start_deep_stats(ctx)
+        elif t == "stream-manifest":
+            d = self._POST_stream_manifest(ctx)
         elif t == "set_children":
             # TODO: docs
             d = self._POST_set_children(req)
@@ -394,6 +397,11 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
         renderer = DeepStatsResults(monitor)
         return self._start_operation(monitor, renderer, ctx)
+    def _POST_stream_manifest(self, ctx):
+        walker = ManifestStreamer(ctx, self.node)
+        monitor = self.node.deep_traverse(walker)
+        return monitor.when_done()
     def _POST_set_children(self, req):
         replace = boolean_of_arg(get_arg(req, "replace", "true"))
@@ -812,3 +820,48 @@ class DeepStatsResults(rend.Page):
         s = self.monitor.get_status().copy()
         s["finished"] = self.monitor.is_finished()
         return simplejson.dumps(s, indent=1)
+class ManifestStreamer(dirnode.DeepStats):
+    def __init__(self, ctx, origin):
+        dirnode.DeepStats.__init__(self, origin)
+        self.req = IRequest(ctx)
+    def add_node(self, node, path):
+        dirnode.DeepStats.add_node(self, node, path)
+        d = {"path": path,
+             "cap": node.get_uri()}
+        if IDirectoryNode.providedBy(node):
+            d["type"] = "directory"
+        else:
+            d["type"] = "file"
+        v = node.get_verify_cap()
+        if v:
+            v = v.to_string()
+        d["verifycap"] = v
+        r = node.get_repair_cap()
+        if r:
+            r = r.to_string()
+        d["repaircap"] = r
+        si = node.get_storage_index()
+        if si:
+            si = base32.b2a(si)
+        d["storage-index"] = si
+        j = simplejson.dumps(d, ensure_ascii=True)
+        assert "\n" not in j
+        self.req.write(j+"\n")
+    def finish(self):
+        stats = dirnode.DeepStats.get_results(self)
+        d = {"type": "stats",
+             "stats": stats,
+             }
+        j = simplejson.dumps(d, ensure_ascii=True)
+        assert "\n" not in j
+        self.req.write(j+"\n")
+        return ""