print
return 0
-def dump_root_dirnode(basedir, config):
+def dump_root_dirnode(basedir, config, output=sys.stdout):
from allmydata import uri
root_dirnode_file = os.path.join(basedir, "vdrive", "root")
f = open(root_dirnode_file, "rb")
key = f.read()
rooturi = uri.pack_dirnode_uri("fakeFURL", key)
- print rooturi
+ print >>output, rooturi
return 0
except EnvironmentError:
- print "unable to read root dirnode file from %s" % root_dirnode_file
+ print >>output, "unable to read root dirnode file from %s" % \
+ root_dirnode_file
return 1
-def dump_directory_node(basedir, config):
+def dump_directory_node(basedir, config, f=sys.stdout):
from allmydata import filetable, vdrive, uri
from allmydata.util import hashutil, idlib
dir_uri = config['uri']
filename = os.path.join(basedir, "vdrive", idlib.b2a(index))
- print
- print "dirnode uri: %s" % dir_uri
- print "filename : %s" % filename
- print "index : %s" % idlib.b2a(index)
+ print >>f
+ print >>f, "dirnode uri: %s" % dir_uri
+ print >>f, "filename : %s" % filename
+ print >>f, "index : %s" % idlib.b2a(index)
if wk:
- print "writekey : %s" % idlib.b2a(wk)
- print "write_enabler: %s" % idlib.b2a(we)
+ print >>f, "writekey : %s" % idlib.b2a(wk)
+ print >>f, "write_enabler: %s" % idlib.b2a(we)
else:
- print "writekey : None"
- print "write_enabler: None"
- print "readkey : %s" % idlib.b2a(rk)
+ print >>f, "writekey : None"
+ print >>f, "write_enabler: None"
+ print >>f, "readkey : %s" % idlib.b2a(rk)
- print
+ print >>f
vds = filetable.VirtualDriveServer(os.path.join(basedir, "vdrive"), False)
data = vds._read_from_file(index)
if we:
if we != data[0]:
- print "ERROR: write_enabler does not match"
+ print >>f, "ERROR: write_enabler does not match"
for (H_key, E_key, E_write, E_read) in data[1]:
if verbose:
- print " H_key %s" % idlib.b2a(H_key)
- print " E_key %s" % idlib.b2a(E_key)
- print " E_write %s" % idlib.b2a(E_write)
- print " E_read %s" % idlib.b2a(E_read)
+ print >>f, " H_key %s" % idlib.b2a(H_key)
+ print >>f, " E_key %s" % idlib.b2a(E_key)
+ print >>f, " E_write %s" % idlib.b2a(E_write)
+ print >>f, " E_read %s" % idlib.b2a(E_read)
key = vdrive.decrypt(rk, E_key)
- print " key %s" % key
+ print >>f, " key %s" % key
if hashutil.dir_name_hash(rk, key) != H_key:
- print " ERROR: H_key does not match"
+ print >>f, " ERROR: H_key does not match"
if wk and E_write:
if len(E_write) < 14:
- print " ERROR: write data is short:", idlib.b2a(E_write)
+ print >>f, " ERROR: write data is short:", idlib.b2a(E_write)
write = vdrive.decrypt(wk, E_write)
- print " write: %s" % write
+ print >>f, " write: %s" % write
read = vdrive.decrypt(rk, E_read)
- print " read: %s" % read
- print
+ print >>f, " read: %s" % read
+ print >>f
return 0
from twisted.trial import unittest
+from cStringIO import StringIO
from twisted.python import usage
import os.path
from allmydata.scripts import runner
[],
run_by_human=False)
+class Diagnostics(unittest.TestCase):
+ def test_dump_root_dirnode_failure(self):
+ s = StringIO()
+ rc = runner.dump_root_dirnode("missing_basedir", {}, s)
+ output = s.getvalue()
+ self.failUnless("unable to read root dirnode file from" in output)
+ self.failIfEqual(rc, 0)
+from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python import failure
from allmydata import vdrive, filetable, uri
from allmydata.interfaces import IDirectoryNode
+from allmydata.scripts import runner
from foolscap import eventual
class LocalReference:
class Test(unittest.TestCase):
def test_create_directory(self):
- basedir = "vdrive/test_create_directory"
+ basedir = "vdrive/test_create_directory/vdrive"
vds = filetable.VirtualDriveServer(basedir)
vds.set_furl("myFURL")
self.client = client = MyClient(vds, "myFURL")
return d
def test_one(self):
- basedir = "vdrive/test_one"
+ self.basedir = basedir = "vdrive/test_one/vdrive"
vds = filetable.VirtualDriveServer(basedir)
vds.set_furl("myFURL")
root_uri = vds.get_public_root_uri()
d.addCallback(lambda res:self.bar_node_readonly.list())
d.addCallback(self.failUnlessKeysMatch, ["baz", "file4"])
+ d.addCallback(self._test_one_3)
return d
+ def _test_one_3(self, res):
+ # now test some of the diag tools with the data we've created
+ s = StringIO()
+ rc = runner.dump_root_dirnode("vdrive/test_one", {}, s)
+ output = s.getvalue()
+ self.failUnless(output.startswith("URI:DIR:fakeFURL:"))
+ self.failUnlessEqual(rc, 0)
+
+ s = StringIO()
+ args = {'uri': self.bar_node.get_uri(),
+ 'verbose': True,
+ }
+ rc = runner.dump_directory_node("vdrive/test_one", args, s)
+ output = s.getvalue()
+ #print output
+ self.failUnless("dirnode uri: URI:DIR:myFURL" in output)
+ self.failUnless("write_enabler" in output)
+ self.failIf("write_enabler: None" in output)
+ self.failUnless("key baz\n" in output)
+ self.failUnless(" write: URI:DIR:myFURL:" in output)
+ self.failUnless(" read: URI:DIR-RO:myFURL:" in output)
+ self.failUnless("key file4\n" in output)
+ self.failUnless("H_key " in output)
+ self.failUnlessEqual(rc, 0)
+
+ s = StringIO()
+ args = {'uri': self.bar_node.get_uri(),
+ 'verbose': False,
+ }
+ rc = runner.dump_directory_node("vdrive/test_one", args, s)
+ output = s.getvalue()
+ #print output
+ self.failUnless("dirnode uri: URI:DIR:myFURL" in output)
+ self.failUnless("write_enabler" in output)
+ self.failIf("write_enabler: None" in output)
+ self.failUnless("key baz\n" in output)
+ self.failUnless(" write: URI:DIR:myFURL:" in output)
+ self.failUnless(" read: URI:DIR-RO:myFURL:" in output)
+ self.failUnless("key file4\n" in output)
+ self.failIf("H_key " in output)
+ self.failUnlessEqual(rc, 0)
+
+ s = StringIO()
+ args = {'uri': self.bar_node_readonly.get_uri(),
+ 'verbose': True,
+ }
+ rc = runner.dump_directory_node("vdrive/test_one", args, s)
+ output = s.getvalue()
+ #print output
+ self.failUnless("dirnode uri: URI:DIR-RO:myFURL" in output)
+ self.failUnless("write_enabler: None" in output)
+ self.failUnless("key baz\n" in output)
+ self.failIf(" write: URI:DIR:myFURL:" in output)
+ self.failUnless(" read: URI:DIR-RO:myFURL:" in output)
+ self.failUnless("key file4\n" in output)
+ self.failUnlessEqual(rc, 0)
+
def shouldFail(self, res, expected_failure, which, substring=None):
if isinstance(res, failure.Failure):
res.trap(expected_failure)