* collected handles (i.e. the GET page has been retrieved at least once
since the operation completed) will remain valid for ten minutes.
+Many "slow" operations can begin to use unacceptable amounts of memory when
+operation on large directory structures. The memory usage increases when the
+ophandle is polled, as the results must be copied into a JSON string, sent
+over the wire, then parsed by a client. So, as an alternative, many "slow"
+operations have streaming equivalents. These equivalents do not use operation
+handles. Instead, they emit line-oriented status results immediately. Client
+code can cancel the operation by simply closing the HTTP connection.
== Programmatic Operations ==
share management data (leases)
backend (ext3) minimum block size
+POST $URL?t=stream-manifest
+
+ This operation performs a recursive walk of all files and directories
+ reachable from the given starting point. For each such unique object
+ (duplicates are skipped), a single line of JSON is emitted to the HTTP
+ response channel. When the walk is complete, a final line of JSON is emitted
+ which contains the accumulated file-size/count "deep-stats" data.
+
+ A CLI tool can split the response stream on newlines into "response units",
+ and parse each response unit as JSON. Each such parsed unit will be a
+ dictionary, and will contain at least the "type" key: a string, one of
+ "file", "directory", or "stats".
+
+ For all units that have a type of "file" or "directory", the dictionary will
+ contain the following keys:
+
+ "path": a list of strings, with the path that is traversed to reach the
+ object
+ "cap": a writecap for the file or directory, if available, else a readcap
+ "verifycap": a verifycap for the file or directory
+ "repaircap": the weakest cap which can still be used to repair the object
+ "storage-index": a base32 storage index for the object
+
+ Note that non-distributed files (i.e. LIT files) will have values of None
+ for verifycap, repaircap, and storage-index, since these files can neither
+ be verified nor repaired, and are not stored on the storage servers.
+
+ The last unit in the stream will have a type of "stats", and will contain
+ the keys described in the "start-deep-stats" operation, below.
+
+
== Other Useful Pages ==
The portion of the web namespace that begins with "/uri" (and "/named") is
return self.my_uri.to_string()
def get_verify_cap(self):
return self.my_uri.get_verify_cap()
+ def get_repair_cap(self):
+ return self.my_uri.get_verify_cap()
def get_storage_index(self):
return self.storage_index
self.fail("%s: not JSON: '%s'" % (url, s))
return data
+ def parse_streamed_json(self, s):
+ for unit in s.split("\n"):
+ if not unit:
+ # stream should end with a newline, so split returns ""
+ continue
+ yield simplejson.loads(unit)
+
def web(self, n, method="GET", **kwargs):
# returns (data, url)
url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
d = self.set_up_nodes()
d.addCallback(self.set_up_tree)
d.addCallback(self.do_stats)
+ d.addCallback(self.do_web_stream_manifest)
d.addCallback(self.do_test_check_good)
d.addCallback(self.do_test_web_good)
d.addCallback(self.do_test_cli_good)
self.failUnlessEqual(s["size-immutable-files"], 13000)
self.failUnlessEqual(s["size-literal-files"], 48)
+ def do_web_stream_manifest(self, ignored):
+ d = self.web(self.root, method="POST", t="stream-manifest")
+ def _check((res,url)):
+ units = list(self.parse_streamed_json(res))
+ files = [u for u in units if u["type"] in ("file", "directory")]
+ assert units[-1]["type"] == "stats"
+ stats = units[-1]["stats"]
+ self.failUnlessEqual(len(files), 5)
+ # [root,mutable,large] are distributed, [small,small2] are not
+ self.failUnlessEqual(len([f for f in files
+ if f["verifycap"] is not None]), 3)
+ self.failUnlessEqual(len([f for f in files
+ if f["verifycap"] is None]), 2)
+ self.failUnlessEqual(len([f for f in files
+ if f["repaircap"] is not None]), 3)
+ self.failUnlessEqual(len([f for f in files
+ if f["repaircap"] is None]), 2)
+ self.failUnlessEqual(len([f for f in files
+ if f["storage-index"] is not None]), 3)
+ self.failUnlessEqual(len([f for f in files
+ if f["storage-index"] is None]), 2)
+ # make sure that a mutable file has filecap==repaircap!=verifycap
+ mutable = [f for f in files
+ if f["cap"] is not None
+ and f["cap"].startswith("URI:SSK:")][0]
+ self.failUnlessEqual(mutable["cap"], self.mutable_uri)
+ self.failIfEqual(mutable["cap"], mutable["verifycap"])
+ self.failUnlessEqual(mutable["cap"], mutable["repaircap"])
+ # for immutable file, verifycap==repaircap!=filecap
+ large = [f for f in files
+ if f["cap"] is not None
+ and f["cap"].startswith("URI:CHK:")][0]
+ self.failUnlessEqual(large["cap"], self.large_uri)
+ self.failIfEqual(large["cap"], large["verifycap"])
+ self.failUnlessEqual(large["verifycap"], large["repaircap"])
+ self.check_stats_good(stats)
+ d.addCallback(_check)
+ return d
+
def do_test_check_good(self, ignored):
d = defer.succeed(None)
# check the individual items
foo.set_uri(unicode_filename, self._bar_txt_uri)
_ign, n, baz_file = self.makefile(2)
+ self._baz_file_uri = baz_file
sub.set_uri(u"baz.txt", baz_file)
_ign, n, self._bad_file_uri = self.makefile(3)
d.addCallback(_got_json)
return d
+ def test_POST_DIRURL_stream_manifest(self):
+ d = self.POST(self.public_url + "/foo/?t=stream-manifest")
+ def _check(res):
+ self.failUnless(res.endswith("\n"))
+ units = [simplejson.loads(t) for t in res[:-1].split("\n")]
+ self.failUnlessEqual(len(units), 7)
+ self.failUnlessEqual(units[-1]["type"], "stats")
+ first = units[0]
+ self.failUnlessEqual(first["path"], [])
+ self.failUnlessEqual(first["cap"], self._foo_uri)
+ self.failUnlessEqual(first["type"], "directory")
+ baz = [u for u in units[:-1] if u["cap"] == self._baz_file_uri][0]
+ self.failUnlessEqual(baz["path"], ["sub", "baz.txt"])
+ self.failIfEqual(baz["storage-index"], None)
+ self.failIfEqual(baz["verifycap"], None)
+ self.failIfEqual(baz["repaircap"], None)
+ return
+ d.addCallback(_check)
+ return d
+
def test_GET_DIRURL_uri(self):
d = self.GET(self.public_url + "/foo?t=uri")
def _check(res):
from allmydata.interfaces import IDirectoryNode, IFileNode, IMutableFileNode, \
ExistingChildError, NoSuchChildError
from allmydata.monitor import Monitor
+from allmydata import dirnode
from allmydata.web.common import text_plain, WebError, \
IClient, IOpHandleTable, NeedOperationHandleError, \
boolean_of_arg, get_arg, get_root, \
d = self._POST_start_deep_size(ctx)
elif t == "start-deep-stats":
d = self._POST_start_deep_stats(ctx)
+ elif t == "stream-manifest":
+ d = self._POST_stream_manifest(ctx)
elif t == "set_children":
# TODO: docs
d = self._POST_set_children(req)
renderer = DeepStatsResults(monitor)
return self._start_operation(monitor, renderer, ctx)
+ def _POST_stream_manifest(self, ctx):
+ walker = ManifestStreamer(ctx, self.node)
+ monitor = self.node.deep_traverse(walker)
+ return monitor.when_done()
+
def _POST_set_children(self, req):
replace = boolean_of_arg(get_arg(req, "replace", "true"))
req.content.seek(0)
s = self.monitor.get_status().copy()
s["finished"] = self.monitor.is_finished()
return simplejson.dumps(s, indent=1)
+
+class ManifestStreamer(dirnode.DeepStats):
+
+ def __init__(self, ctx, origin):
+ dirnode.DeepStats.__init__(self, origin)
+ self.req = IRequest(ctx)
+
+ def add_node(self, node, path):
+ dirnode.DeepStats.add_node(self, node, path)
+ d = {"path": path,
+ "cap": node.get_uri()}
+
+ if IDirectoryNode.providedBy(node):
+ d["type"] = "directory"
+ else:
+ d["type"] = "file"
+
+ v = node.get_verify_cap()
+ if v:
+ v = v.to_string()
+ d["verifycap"] = v
+
+ r = node.get_repair_cap()
+ if r:
+ r = r.to_string()
+ d["repaircap"] = r
+
+ si = node.get_storage_index()
+ if si:
+ si = base32.b2a(si)
+ d["storage-index"] = si
+
+ j = simplejson.dumps(d, ensure_ascii=True)
+ assert "\n" not in j
+ self.req.write(j+"\n")
+
+ def finish(self):
+ stats = dirnode.DeepStats.get_results(self)
+ d = {"type": "stats",
+ "stats": stats,
+ }
+ j = simplejson.dumps(d, ensure_ascii=True)
+ assert "\n" not in j
+ self.req.write(j+"\n")
+ return ""