+import urllib, simplejson
+from twisted.protocols.basic import LineOnlyReceiver
from allmydata.util import base32
from allmydata.util.abbreviate import abbreviate_space_both
from allmydata import uri
from allmydata.scripts.slow_operation import SlowOperationRunner
+from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, escape_path
+from allmydata.scripts.common_http import do_http
+
+class FakeTransport:
+ disconnecting = False
+class ManifestStreamer(LineOnlyReceiver):
+ delimiter = "\n"
+ def __init__(self):
+ self.transport = FakeTransport()
+
+ def run(self, options):
+ stdout = options.stdout
+ stderr = options.stderr
+ self.options = options
+ nodeurl = options['node-url']
+ if not nodeurl.endswith("/"):
+ nodeurl += "/"
+ self.nodeurl = nodeurl
+ where = options.where
+ rootcap, path = get_alias(options.aliases, where, DEFAULT_ALIAS)
+ if path == '/':
+ path = ''
+ url = nodeurl + "uri/%s" % urllib.quote(rootcap)
+ if path:
+ url += "/" + escape_path(path)
+ # todo: should it end with a slash?
+ url += "?t=stream-manifest"
+ resp = do_http("POST", url)
+ if resp.status not in (200, 302):
+ print >>stderr, "ERROR", resp.status, resp.reason, resp.read()
+ return 1
+ #print "RESP", dir(resp)
+ # use Twisted to split this into lines
+ while True:
+ chunk = resp.read(100)
+ if not chunk:
+ break
+ if self.options["raw"]:
+ stdout.write(chunk)
+ else:
+ self.dataReceived(chunk)
+ return 0
+
+ def lineReceived(self, line):
+ d = simplejson.loads(line)
+ stdout = self.options.stdout
+ if d["type"] in ("file", "directory"):
+ if self.options["storage-index"]:
+ si = d["storage-index"]
+ if si:
+ print >>stdout, si
+ elif self.options["verify-cap"]:
+ vc = d["verifycap"]
+ if vc:
+ print >>stdout, vc
+ elif self.options["repair-cap"]:
+ vc = d["repaircap"]
+ if vc:
+ print >>stdout, vc
+ else:
+ try:
+ print >>stdout, d["cap"], "/".join(d["path"])
+ except UnicodeEncodeError:
+ print >>stdout, d["cap"], "/".join([p.encode("utf-8")
+ for p in d["path"]])
+
+
class ManifestGrabber(SlowOperationRunner):
for p in path])
def manifest(options):
- return ManifestGrabber().run(options)
+ if options["stream"]:
+ return ManifestStreamer().run(options)
+ else:
+ return ManifestGrabber().run(options)
class StatsGrabber(SlowOperationRunner):
d.addCallback(lambda ign: self.do_cli_manifest1())
d.addCallback(lambda ign: self.do_cli_manifest2())
d.addCallback(lambda ign: self.do_cli_manifest3())
+ d.addCallback(lambda ign: self.do_cli_manifest_stream1())
+ d.addCallback(lambda ign: self.do_cli_manifest_stream2())
+ d.addCallback(lambda ign: self.do_cli_manifest_stream3())
+ d.addCallback(lambda ign: self.do_cli_manifest_stream4())
+ d.addCallback(lambda ign: self.do_cli_manifest_stream5())
d.addCallback(lambda ign: self.do_cli_stats1())
d.addCallback(lambda ign: self.do_cli_stats2())
return d
d = self._run_cli(["manifest",
"--node-directory", basedir,
"--storage-index", self.root_uri])
- def _check2((out,err)):
+ def _check((out,err)):
self.failUnlessEqual(err, "")
- lines = [l for l in out.split("\n") if l]
- self.failUnlessEqual(len(lines), 3)
- self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
- self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
- self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
- d.addCallback(_check2)
+ self._check_manifest_storage_index(out)
+ d.addCallback(_check)
return d
+ def _check_manifest_storage_index(self, out):
+ lines = [l for l in out.split("\n") if l]
+ self.failUnlessEqual(len(lines), 3)
+ self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
+ self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
+ self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
+
def do_cli_manifest3(self):
basedir = self.getdir("client0")
d = self._run_cli(["manifest",
d.addCallback(_check2r)
return d
+ def do_cli_manifest_stream1(self):
+ basedir = self.getdir("client0")
+ d = self._run_cli(["manifest",
+ "--node-directory", basedir,
+ "--stream",
+ self.root_uri])
+ def _check((out,err)):
+ self.failUnlessEqual(err, "")
+ lines = [l for l in out.split("\n") if l]
+ self.failUnlessEqual(len(lines), 5)
+ caps = {}
+ for l in lines:
+ try:
+ cap, path = l.split(None, 1)
+ except ValueError:
+ cap = l.strip()
+ path = ""
+ caps[cap] = path
+ self.failUnless(self.root.get_uri() in caps)
+ self.failUnlessEqual(caps[self.root.get_uri()], "")
+ self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable")
+ self.failUnlessEqual(caps[self.large.get_uri()], "large")
+ self.failUnlessEqual(caps[self.small.get_uri()], "small")
+ self.failUnlessEqual(caps[self.small2.get_uri()], "small2")
+ d.addCallback(_check)
+ return d
+
+ def do_cli_manifest_stream2(self):
+ basedir = self.getdir("client0")
+ d = self._run_cli(["manifest",
+ "--node-directory", basedir,
+ "--stream", "--raw",
+ self.root_uri])
+ def _check((out,err)):
+ self.failUnlessEqual(err, "")
+ # this should be the same as the POST t=stream-manifest output
+ self._check_streamed_manifest(out)
+ d.addCallback(_check)
+ return d
+
+ def do_cli_manifest_stream3(self):
+ basedir = self.getdir("client0")
+ d = self._run_cli(["manifest",
+ "--node-directory", basedir,
+ "--stream", "--storage-index",
+ self.root_uri])
+ def _check((out,err)):
+ self.failUnlessEqual(err, "")
+ self._check_manifest_storage_index(out)
+ d.addCallback(_check)
+ return d
+
+ def do_cli_manifest_stream4(self):
+ basedir = self.getdir("client0")
+ d = self._run_cli(["manifest",
+ "--node-directory", basedir,
+ "--stream", "--verify-cap",
+ self.root_uri])
+ def _check((out,err)):
+ self.failUnlessEqual(err, "")
+ lines = [l for l in out.split("\n") if l]
+ self.failUnlessEqual(len(lines), 3)
+ self.failUnless(self.root.get_verify_cap().to_string() in lines)
+ self.failUnless(self.mutable.get_verify_cap().to_string() in lines)
+ self.failUnless(self.large.get_verify_cap().to_string() in lines)
+ d.addCallback(_check)
+ return d
+
+ def do_cli_manifest_stream5(self):
+ basedir = self.getdir("client0")
+ d = self._run_cli(["manifest",
+ "--node-directory", basedir,
+ "--stream", "--repair-cap",
+ self.root_uri])
+ def _check((out,err)):
+ self.failUnlessEqual(err, "")
+ lines = [l for l in out.split("\n") if l]
+ self.failUnlessEqual(len(lines), 3)
+ self.failUnless(self.root.get_repair_cap().to_string() in lines)
+ self.failUnless(self.mutable.get_repair_cap().to_string() in lines)
+ self.failUnless(self.large.get_repair_cap().to_string() in lines)
+ d.addCallback(_check)
+ return d
+
def do_cli_stats1(self):
basedir = self.getdir("client0")
d = self._run_cli(["stats",