From 26260374e974c514b09bad79ec5e658e1d1a8f52 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Thu, 22 Jan 2009 22:01:36 -0700 Subject: [PATCH] #590: add webish t=stream-manifest --- docs/frontends/webapi.txt | 38 ++++++++++++++++++++++ src/allmydata/test/common.py | 2 ++ src/allmydata/test/test_system.py | 47 +++++++++++++++++++++++++++ src/allmydata/test/test_web.py | 21 ++++++++++++ src/allmydata/web/directory.py | 53 +++++++++++++++++++++++++++++++ 5 files changed, 161 insertions(+) diff --git a/docs/frontends/webapi.txt b/docs/frontends/webapi.txt index 8cc9288f..86da8807 100644 --- a/docs/frontends/webapi.txt +++ b/docs/frontends/webapi.txt @@ -257,6 +257,13 @@ If a retain-for= argument is not used, the default handle lifetimes are: * collected handles (i.e. the GET page has been retrieved at least once since the operation completed) will remain valid for ten minutes. +Many "slow" operations can begin to use unacceptable amounts of memory when +operation on large directory structures. The memory usage increases when the +ophandle is polled, as the results must be copied into a JSON string, sent +over the wire, then parsed by a client. So, as an alternative, many "slow" +operations have streaming equivalents. These equivalents do not use operation +handles. Instead, they emit line-oriented status results immediately. Client +code can cancel the operation by simply closing the HTTP connection. == Programmatic Operations == @@ -1043,6 +1050,37 @@ POST $DIRURL?t=start-deep-stats (must add &ophandle=XYZ) share management data (leases) backend (ext3) minimum block size +POST $URL?t=stream-manifest + + This operation performs a recursive walk of all files and directories + reachable from the given starting point. For each such unique object + (duplicates are skipped), a single line of JSON is emitted to the HTTP + response channel. When the walk is complete, a final line of JSON is emitted + which contains the accumulated file-size/count "deep-stats" data. + + A CLI tool can split the response stream on newlines into "response units", + and parse each response unit as JSON. Each such parsed unit will be a + dictionary, and will contain at least the "type" key: a string, one of + "file", "directory", or "stats". + + For all units that have a type of "file" or "directory", the dictionary will + contain the following keys: + + "path": a list of strings, with the path that is traversed to reach the + object + "cap": a writecap for the file or directory, if available, else a readcap + "verifycap": a verifycap for the file or directory + "repaircap": the weakest cap which can still be used to repair the object + "storage-index": a base32 storage index for the object + + Note that non-distributed files (i.e. LIT files) will have values of None + for verifycap, repaircap, and storage-index, since these files can neither + be verified nor repaired, and are not stored on the storage servers. + + The last unit in the stream will have a type of "stats", and will contain + the keys described in the "start-deep-stats" operation, below. + + == Other Useful Pages == The portion of the web namespace that begins with "/uri" (and "/named") is diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index 747bd74c..5c40ba3e 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -48,6 +48,8 @@ class FakeCHKFileNode: return self.my_uri.to_string() def get_verify_cap(self): return self.my_uri.get_verify_cap() + def get_repair_cap(self): + return self.my_uri.get_verify_cap() def get_storage_index(self): return self.storage_index diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 7a7e9333..d192bcb7 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -1937,6 +1937,13 @@ class DeepCheckBase(SystemTestMixin, ErrorMixin): self.fail("%s: not JSON: '%s'" % (url, s)) return data + def parse_streamed_json(self, s): + for unit in s.split("\n"): + if not unit: + # stream should end with a newline, so split returns "" + continue + yield simplejson.loads(unit) + def web(self, n, method="GET", **kwargs): # returns (data, url) url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri()) @@ -2100,6 +2107,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): d = self.set_up_nodes() d.addCallback(self.set_up_tree) d.addCallback(self.do_stats) + d.addCallback(self.do_web_stream_manifest) d.addCallback(self.do_test_check_good) d.addCallback(self.do_test_web_good) d.addCallback(self.do_test_cli_good) @@ -2136,6 +2144,45 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): self.failUnlessEqual(s["size-immutable-files"], 13000) self.failUnlessEqual(s["size-literal-files"], 48) + def do_web_stream_manifest(self, ignored): + d = self.web(self.root, method="POST", t="stream-manifest") + def _check((res,url)): + units = list(self.parse_streamed_json(res)) + files = [u for u in units if u["type"] in ("file", "directory")] + assert units[-1]["type"] == "stats" + stats = units[-1]["stats"] + self.failUnlessEqual(len(files), 5) + # [root,mutable,large] are distributed, [small,small2] are not + self.failUnlessEqual(len([f for f in files + if f["verifycap"] is not None]), 3) + self.failUnlessEqual(len([f for f in files + if f["verifycap"] is None]), 2) + self.failUnlessEqual(len([f for f in files + if f["repaircap"] is not None]), 3) + self.failUnlessEqual(len([f for f in files + if f["repaircap"] is None]), 2) + self.failUnlessEqual(len([f for f in files + if f["storage-index"] is not None]), 3) + self.failUnlessEqual(len([f for f in files + if f["storage-index"] is None]), 2) + # make sure that a mutable file has filecap==repaircap!=verifycap + mutable = [f for f in files + if f["cap"] is not None + and f["cap"].startswith("URI:SSK:")][0] + self.failUnlessEqual(mutable["cap"], self.mutable_uri) + self.failIfEqual(mutable["cap"], mutable["verifycap"]) + self.failUnlessEqual(mutable["cap"], mutable["repaircap"]) + # for immutable file, verifycap==repaircap!=filecap + large = [f for f in files + if f["cap"] is not None + and f["cap"].startswith("URI:CHK:")][0] + self.failUnlessEqual(large["cap"], self.large_uri) + self.failIfEqual(large["cap"], large["verifycap"]) + self.failUnlessEqual(large["verifycap"], large["repaircap"]) + self.check_stats_good(stats) + d.addCallback(_check) + return d + def do_test_check_good(self, ignored): d = defer.succeed(None) # check the individual items diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 2711a0e6..5967e41d 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -168,6 +168,7 @@ class WebMixin(object): foo.set_uri(unicode_filename, self._bar_txt_uri) _ign, n, baz_file = self.makefile(2) + self._baz_file_uri = baz_file sub.set_uri(u"baz.txt", baz_file) _ign, n, self._bad_file_uri = self.makefile(3) @@ -1050,6 +1051,26 @@ class Web(WebMixin, testutil.StallMixin, unittest.TestCase): d.addCallback(_got_json) return d + def test_POST_DIRURL_stream_manifest(self): + d = self.POST(self.public_url + "/foo/?t=stream-manifest") + def _check(res): + self.failUnless(res.endswith("\n")) + units = [simplejson.loads(t) for t in res[:-1].split("\n")] + self.failUnlessEqual(len(units), 7) + self.failUnlessEqual(units[-1]["type"], "stats") + first = units[0] + self.failUnlessEqual(first["path"], []) + self.failUnlessEqual(first["cap"], self._foo_uri) + self.failUnlessEqual(first["type"], "directory") + baz = [u for u in units[:-1] if u["cap"] == self._baz_file_uri][0] + self.failUnlessEqual(baz["path"], ["sub", "baz.txt"]) + self.failIfEqual(baz["storage-index"], None) + self.failIfEqual(baz["verifycap"], None) + self.failIfEqual(baz["repaircap"], None) + return + d.addCallback(_check) + return d + def test_GET_DIRURL_uri(self): d = self.GET(self.public_url + "/foo?t=uri") def _check(res): diff --git a/src/allmydata/web/directory.py b/src/allmydata/web/directory.py index a76f7090..27f8ac9c 100644 --- a/src/allmydata/web/directory.py +++ b/src/allmydata/web/directory.py @@ -16,6 +16,7 @@ from allmydata.uri import from_string_dirnode from allmydata.interfaces import IDirectoryNode, IFileNode, IMutableFileNode, \ ExistingChildError, NoSuchChildError from allmydata.monitor import Monitor +from allmydata import dirnode from allmydata.web.common import text_plain, WebError, \ IClient, IOpHandleTable, NeedOperationHandleError, \ boolean_of_arg, get_arg, get_root, \ @@ -192,6 +193,8 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): d = self._POST_start_deep_size(ctx) elif t == "start-deep-stats": d = self._POST_start_deep_stats(ctx) + elif t == "stream-manifest": + d = self._POST_stream_manifest(ctx) elif t == "set_children": # TODO: docs d = self._POST_set_children(req) @@ -394,6 +397,11 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): renderer = DeepStatsResults(monitor) return self._start_operation(monitor, renderer, ctx) + def _POST_stream_manifest(self, ctx): + walker = ManifestStreamer(ctx, self.node) + monitor = self.node.deep_traverse(walker) + return monitor.when_done() + def _POST_set_children(self, req): replace = boolean_of_arg(get_arg(req, "replace", "true")) req.content.seek(0) @@ -812,3 +820,48 @@ class DeepStatsResults(rend.Page): s = self.monitor.get_status().copy() s["finished"] = self.monitor.is_finished() return simplejson.dumps(s, indent=1) + +class ManifestStreamer(dirnode.DeepStats): + + def __init__(self, ctx, origin): + dirnode.DeepStats.__init__(self, origin) + self.req = IRequest(ctx) + + def add_node(self, node, path): + dirnode.DeepStats.add_node(self, node, path) + d = {"path": path, + "cap": node.get_uri()} + + if IDirectoryNode.providedBy(node): + d["type"] = "directory" + else: + d["type"] = "file" + + v = node.get_verify_cap() + if v: + v = v.to_string() + d["verifycap"] = v + + r = node.get_repair_cap() + if r: + r = r.to_string() + d["repaircap"] = r + + si = node.get_storage_index() + if si: + si = base32.b2a(si) + d["storage-index"] = si + + j = simplejson.dumps(d, ensure_ascii=True) + assert "\n" not in j + self.req.write(j+"\n") + + def finish(self): + stats = dirnode.DeepStats.get_results(self) + d = {"type": "stats", + "stats": stats, + } + j = simplejson.dumps(d, ensure_ascii=True) + assert "\n" not in j + self.req.write(j+"\n") + return "" -- 2.45.2