add 'tahoe debug consolidate' command, to merge directories created by repeated ...
authorBrian Warner <warner@allmydata.com>
Thu, 12 Mar 2009 20:56:06 +0000 (13:56 -0700)
committerBrian Warner <warner@allmydata.com>
Thu, 12 Mar 2009 20:56:06 +0000 (13:56 -0700)
src/allmydata/scripts/consolidate.py [new file with mode: 0644]
src/allmydata/scripts/debug.py
src/allmydata/test/test_consolidate.py [new file with mode: 0644]

diff --git a/src/allmydata/scripts/consolidate.py b/src/allmydata/scripts/consolidate.py
new file mode 100644 (file)
index 0000000..5fccff3
--- /dev/null
@@ -0,0 +1,364 @@
+
+import os, pickle
+import sqlite3 as sqlite
+
+import urllib
+import simplejson
+from allmydata.scripts.common_http import do_http
+from allmydata.scripts.tahoe_backup import parse_old_timestamp, readonly, \
+     raiseHTTPError, HTTPError
+from allmydata.util import hashutil, base32
+from allmydata.util.netstring import netstring
+from allmydata.scripts.common import get_alias, DEFAULT_ALIAS
+
+TAG = "consolidator_dirhash_v1"
+
+class CycleDetected(Exception):
+    pass
+
+
+class Consolidator:
+    def __init__(self, options):
+        self.options = options
+        self.rootcap, path = get_alias(options.aliases, options.where,
+                                       DEFAULT_ALIAS)
+        assert path == ""
+        self.dbfile = options["dbfile"]
+        assert self.dbfile, "--dbfile is required"
+        self.backupfile = options["backupfile"]
+        assert self.backupfile, "--backupfile is required"
+        self.nodeurl = options["node-url"]
+        if not self.nodeurl.endswith("/"):
+            self.nodeurl += "/"
+        self.must_rescan_readonly_snapshots = not os.path.exists(self.dbfile)
+        self.db = sqlite.connect(self.dbfile)
+        self.cursor = self.db.cursor()
+        try:
+            self.cursor.execute("CREATE TABLE dirhashes"
+                                "("
+                                " dirhash TEXT PRIMARY KEY,"
+                                " dircap TEXT"
+                                ")")
+        except sqlite.OperationalError, e:
+            if "table dirhashes already exists" not in str(e):
+                raise
+        self.visited = set()
+
+    def read_directory_json(self, dircap):
+        url = self.nodeurl + "uri/%s?t=json" % urllib.quote(dircap)
+        resp = do_http("GET", url)
+        if resp.status != 200:
+            raiseHTTPError("Error during directory GET", resp)
+        jd = simplejson.load(resp)
+        ntype, ndata = jd
+        if ntype != "dirnode":
+            return None
+        return ndata
+
+    def msg(self, text):
+        print >>self.options.stdout, text
+    def err(self, text):
+        print >>self.options.stderr, text
+
+    def consolidate(self):
+        try:
+            data = self.read_directory_json(self.rootcap + "/Backups")
+        except HTTPError:
+            self.err("Unable to list /Backups, maybe this account has none?")
+            return 1
+        kids = data["children"]
+        potential_systems = {}
+        for (childname, (childtype, childdata)) in kids.items():
+            if childtype != "dirnode":
+                continue
+            potential_systems[childname] = str(childdata["rw_uri"])
+        backup_data = {"Backups": data, "systems": {}, "archives": {}}
+        systems = {}
+        for name, sdircap in potential_systems.items():
+            sdata = self.read_directory_json(sdircap)
+            kids = sdata["children"]
+            if not u"Archives" in kids and not u"Latest Backup" in kids:
+                self.msg("%s: not a backupdir, no 'Archives' and 'Latest'" % name)
+                continue
+            self.msg("%s is a system" % name)
+            backup_data["systems"][name] = sdata
+            archives_dircap = kids[u"Archives"][1]["rw_uri"]
+            archives_data = self.read_directory_json(archives_dircap)
+            backup_data["archives"][name] = archives_data
+            systems[name] = archives_dircap
+        if not systems:
+            self.msg("No systems under /Backups, nothing to consolidate")
+            return 0
+        if not os.path.exists(self.backupfile):
+            f = open(self.backupfile, "wb")
+            pickle.dump(backup_data, f)
+            f.close()
+
+        for name, archives_dircap in sorted(systems.items()):
+            self.do_system(name, archives_dircap)
+        return 0
+
+    def do_system(self, system_name, archives_dircap):
+        # first we walk through the Archives list, looking for the existing
+        # snapshots. Each one will have a $NAME like "2008-11-16 10.34 PM"
+        # (in various forms: we use tahoe_backup.parse_old_timestamp to
+        # interpret it). At first, they'll all have $NAME and be writecaps.
+        # As we run, we will create $NAME-readonly (with a readcap) for each
+        # one (the first one will just be the readonly equivalent of the
+        # oldest snapshot: the others will be constructed out of shared
+        # directories). When we're done we'll have a $NAME-readonly for
+        # everything except the latest snapshot (to avoid any danger of
+        # modifying a backup that's already in progress). The very last step,
+        # which won't be enabled until we're sure that everything is working
+        # correctly, will replace each $NAME with $NAME-readonly.
+
+        # We maintain a table that maps dirhash (hash of directory contents)
+        # to a directory readcap which contains those contents. We use this
+        # to decide if we can link to an existing directory, or if we must
+        # create a brand new one. Usually we add to this table in two places:
+        # when we scan the oldest snapshot (which we've just converted to
+        # readonly form), and when we must create a brand new one. If the
+        # table doesn't exist (probably because we've manually deleted it),
+        # we will scan *all* the existing readonly snapshots, and repopulate
+        # the table. We keep this table in a SQLite database (rather than a
+        # pickle) because we want to update it persistently after every
+        # directory creation, and writing out a 10k entry pickle takes about
+        # 250ms
+
+        # 'snapshots' maps timestamp to [rwname, writecap, roname, readcap].
+        # The possibilities are:
+        #  [$NAME, writecap, None, None] : haven't touched it
+        #  [$NAME, writecap, $NAME-readonly, readcap] : processed, not replaced
+        #  [None, None, $NAME, readcap] : processed and replaced
+
+        self.msg("consolidating system %s" % system_name)
+        self.directories_reused = 0
+        self.directories_used_as_is = 0
+        self.directories_created = 0
+        data = self.read_directory_json(archives_dircap)
+        snapshots = {}
+
+        for (childname, (childtype, childdata)) in data["children"].items():
+            if childtype != "dirnode":
+                self.msg("non-dirnode %s in Archives/" % childname)
+                continue
+            timename = childname
+            if childname.endswith("-readonly"):
+                timename = childname[:-len("-readonly")]
+            timestamp = parse_old_timestamp(timename, self.options)
+            assert timestamp is not None, timename
+            snapshots.setdefault(timestamp, [None, None, None, None])
+            # if the snapshot is readonly (i.e. it has no rw_uri), we might
+            # need to re-scan it
+            is_readonly = not childdata.has_key("rw_uri")
+            if is_readonly:
+                readcap = str(childdata["ro_uri"])
+                if self.must_rescan_readonly_snapshots:
+                    self.scan_old_directory(str(childdata["ro_uri"]))
+                snapshots[timestamp][2] = childname
+                snapshots[timestamp][3] = readcap
+            else:
+                writecap = str(childdata["rw_uri"])
+                snapshots[timestamp][0] = childname
+                snapshots[timestamp][1] = writecap
+        else:
+            self.msg(" No snapshots in Backups/Archives")
+        snapshots = [ [timestamp] + values
+                      for (timestamp, values) in snapshots.items() ]
+        # now 'snapshots' is [timestamp, rwname, writecap, roname, readcap],
+        # which makes it easier to process in temporal order
+        snapshots.sort()
+        self.msg(" %d snapshots" % len(snapshots))
+        # we always ignore the last one, for safety
+        snapshots = snapshots[:-1]
+
+        first_snapshot = True
+        for (timestamp, rwname, writecap, roname, readcap) in snapshots:
+            start_created = self.directories_created
+            start_used_as_is = self.directories_used_as_is
+            start_reused = self.directories_reused
+
+            # [None, None, $NAME, readcap] : processed and replaced
+            # [$NAME, writecap, $NAME-readonly, readcap] : processed, not replaced
+            # [$NAME, writecap, None, None] : haven't touched it
+
+            if readcap and not writecap:
+                # skip past anything we've already processed and replaced
+                assert roname
+                assert not rwname
+                first_snapshot = False
+                self.msg(" %s already readonly" % roname)
+                continue
+            if readcap and writecap:
+                # we've processed it, creating a -readonly version, but we
+                # haven't replaced it.
+                assert roname
+                assert rwname
+                first_snapshot = False
+                self.msg(" %s processed but not yet replaced" % roname)
+                if self.options["really"]:
+                    self.msg("  replacing %s with %s" % (rwname, roname))
+                    self.put_child(archives_dircap, rwname, readcap)
+                    self.delete_child(archives_dircap, roname)
+                continue
+            assert writecap
+            assert rwname
+            assert not readcap
+            assert not roname
+            roname = rwname + "-readonly"
+            # for the oldest snapshot, we can do a simple readonly conversion
+            if first_snapshot:
+                first_snapshot = False
+                readcap = readonly(writecap)
+                self.directories_used_as_is += 1
+                self.msg(" %s: oldest snapshot, using as-is" % rwname)
+                self.scan_old_directory(readcap)
+            else:
+                # for the others, we must scan their contents and build up a new
+                # readonly directory (which shares common subdirs with previous
+                # backups)
+                self.msg(" %s: processing" % rwname)
+                readcap = self.process_directory(readonly(writecap))
+            if self.options["really"]:
+                self.msg("  replaced %s" % rwname)
+                self.put_child(archives_dircap, rwname, readcap)
+            else:
+                self.msg("  created %s" % roname)
+                self.put_child(archives_dircap, roname, readcap)
+
+            snapshot_created = self.directories_created - start_created
+            snapshot_used_as_is = self.directories_used_as_is - start_used_as_is
+            snapshot_reused = self.directories_reused - start_reused
+            self.msg("  %s: done: %d dirs created, %d used as-is, %d reused"
+                     % (rwname,
+                        snapshot_created, snapshot_used_as_is, snapshot_reused))
+        # done!
+        self.msg(" system done, %d dirs created, %d used as-is, %d reused" \
+                 % (self.directories_created, self.directories_used_as_is,
+                    self.directories_reused))
+
+    def process_directory(self, readcap):
+        # I walk all my children (recursing over any subdirectories), build
+        # up a table of my contents, then see if I can re-use an old
+        # directory with the same contents. If not, I create a new directory
+        # for my contents. In all cases I return a directory readcap that
+        # points to my contents.
+
+        # build up contents to pass to mkdir() (which uses t=set_children)
+        contents = {} # childname -> (type, rocap, metadata)
+        data = self.read_directory_json(readcap)
+        assert data is not None
+        hashkids = []
+        num_dirs = 0
+        for (childname, (childtype, childdata)) in sorted(data["children"].items()):
+            if childtype == "dirnode":
+                num_dirs += 1
+                childcap = self.process_directory(str(childdata["ro_uri"]))
+                contents[childname] = ("dirnode", childcap, None)
+            else:
+                childcap = str(childdata["ro_uri"])
+                contents[childname] = (childtype, childcap, None)
+            hashkids.append( (childname, childcap) )
+
+        dirhash = self.hash_directory_contents(hashkids)
+        old_dircap = self.get_old_dirhash(dirhash)
+        if old_dircap:
+            assert isinstance(old_dircap, str)
+            self.directories_reused += 1
+            return old_dircap
+        if num_dirs == 0:
+            # we're allowed to re-use this directory
+            new_dircap = readonly(readcap)
+            assert isinstance(new_dircap, str)
+            self.store_dirhash(dirhash, new_dircap)
+            self.directories_used_as_is += 1
+            return new_dircap
+        # otherwise, we need to create a new directory
+        new_dircap = readonly(self.mkdir(contents))
+        assert isinstance(new_dircap, str)
+        self.store_dirhash(dirhash, new_dircap)
+        self.directories_created += 1
+        return new_dircap
+
+    def put_child(self, dircap, childname, childcap):
+        url = self.nodeurl + "uri/%s/%s?t=uri" % (urllib.quote(dircap),
+                                                  urllib.quote(childname))
+        resp = do_http("PUT", url, childcap)
+        if resp.status not in (200, 201):
+            raiseHTTPError("error during put_child", resp)
+
+    def delete_child(self, dircap, childname):
+        url = self.nodeurl + "uri/%s/%s" % (urllib.quote(dircap),
+                                            urllib.quote(childname))
+        resp = do_http("DELETE", url)
+        if resp.status not in (200, 201):
+            raiseHTTPError("error during delete_child", resp)
+
+    def mkdir(self, contents):
+        url = self.nodeurl + "uri?t=mkdir"
+        resp = do_http("POST", url)
+        if resp.status < 200 or resp.status >= 300:
+            raiseHTTPError("error during mkdir", resp)
+        dircap = str(resp.read().strip())
+        url = self.nodeurl + "uri/%s?t=set_children" % urllib.quote(dircap)
+        body = dict([ (childname, (contents[childname][0],
+                                   {"ro_uri": contents[childname][1],
+                                    "metadata": contents[childname][2],
+                                    }))
+                      for childname in contents
+                      ])
+        resp = do_http("POST", url, simplejson.dumps(body))
+        if resp.status != 200:
+            raiseHTTPError("error during set_children", resp)
+        return dircap
+
+    def scan_old_directory(self, dircap):
+        # scan this directory (recursively) and stash a hash of its contents
+        # in the DB. This assumes that all subdirs can be used as-is (i.e.
+        # they've already been declared immutable)
+        dircap = readonly(dircap)
+        if dircap in self.visited:
+            raise CycleDetected
+        self.visited.add(dircap)
+        data = self.read_directory_json(dircap)
+        kids = []
+        for (childname, (childtype, childdata)) in data["children"].items():
+            childcap = str(childdata["ro_uri"])
+            if childtype == "dirnode":
+                self.scan_old_directory(childcap)
+            kids.append( (childname, childcap) )
+        dirhash = self.hash_directory_contents(kids)
+        self.store_dirhash(dirhash, dircap)
+        return dirhash
+
+    def hash_directory_contents(self, kids):
+        kids.sort()
+        s = "".join([netstring(childname.encode("utf-8"))+netstring(childcap)
+                     for (childname, childcap) in kids])
+        return hashutil.tagged_hash(TAG, s)
+
+    def store_dirhash(self, dirhash, dircap):
+        assert isinstance(dircap, str)
+        # existing items should prevail
+        try:
+            c = self.cursor
+            c.execute("INSERT INTO dirhashes (dirhash, dircap) VALUES (?,?)",
+                      (base32.b2a(dirhash), dircap))
+            self.db.commit()
+        except sqlite.IntegrityError:
+            # already present
+            pass
+
+    def get_old_dirhash(self, dirhash):
+        self.cursor.execute("SELECT dircap FROM dirhashes WHERE dirhash=?",
+                            (base32.b2a(dirhash),))
+        row = self.cursor.fetchone()
+        if not row:
+            return None
+        (dircap,) = row
+        return str(dircap)
+
+
+def main(options):
+    c = Consolidator(options)
+    return c.consolidate()
index 7be41694839e21b08b86812d05c93677387b3044..75e5b595f801a3ba40c64ccb5621ab4658087e4d 100644 (file)
@@ -4,6 +4,7 @@
 import struct, time, os
 from twisted.python import usage, failure
 from twisted.internet import defer
+from allmydata.scripts.cli import VDriveOptions
 
 class DumpOptions(usage.Options):
     def getSynopsis(self):
@@ -760,6 +761,21 @@ def repl(options):
     return code.interact()
 
 
+class ConsolidateOptions(VDriveOptions):
+    optParameters = [
+        ("dbfile", None, None, "persistent file for reusable dirhashes"),
+        ("backupfile", "b", None, "file to store backup of Archives/ contents"),
+        ]
+    optFlags = [
+        ("really", None, "Really remove old snapshot directories"),
+        ]
+    def parseArgs(self, where):
+        self.where = where
+
+def consolidate(options):
+    from consolidate import main; return main(options)
+
+
 class DebugCommand(usage.Options):
     subCommands = [
         ["dump-share", None, DumpOptions,
@@ -769,6 +785,7 @@ class DebugCommand(usage.Options):
         ["catalog-shares", None, CatalogSharesOptions, "Describe shares in node dirs"],
         ["corrupt-share", None, CorruptShareOptions, "Corrupt a share"],
         ["repl", None, ReplOptions, "Open a python interpreter"],
+        ["consolidate", None, ConsolidateOptions, "Consolidate non-shared backups"],
         ]
     def postOptions(self):
         if not hasattr(self, 'subOptions'):
@@ -784,6 +801,7 @@ Subcommands:
     tahoe debug find-shares     Locate sharefiles in node directories
     tahoe debug catalog-shares  Describe all shares in node dirs
     tahoe debug corrupt-share   Corrupt a share by flipping a bit.
+    tahoe debug consolidate     Consolidate old non-shared backups into shared ones.
 
 Please run e.g. 'tahoe debug dump-share --help' for more details on each
 subcommand.
@@ -797,6 +815,7 @@ subDispatch = {
     "catalog-shares": catalog_shares,
     "corrupt-share": corrupt_share,
     "repl": repl,
+    "consolidate": consolidate,
     }
 
 
diff --git a/src/allmydata/test/test_consolidate.py b/src/allmydata/test/test_consolidate.py
new file mode 100644 (file)
index 0000000..acc0f49
--- /dev/null
@@ -0,0 +1,297 @@
+
+import os
+from cStringIO import StringIO
+import pickle
+from twisted.trial import unittest
+from allmydata.test.no_network import GridTestMixin
+from allmydata.util import fileutil
+from allmydata.scripts import runner, debug
+from allmydata.scripts.common import get_aliases
+from twisted.internet import defer, threads # CLI tests use deferToThread
+from allmydata.interfaces import IDirectoryNode
+
+
+class CLITestMixin:
+    def do_cli(self, verb, *args, **kwargs):
+        nodeargs = [
+            "--node-directory", self.get_clientdir(),
+            ]
+        if verb == "debug":
+            argv = [verb, args[0]] + nodeargs + list(args[1:])
+        else:
+            argv = [verb] + nodeargs + list(args)
+        stdin = kwargs.get("stdin", "")
+        stdout, stderr = StringIO(), StringIO()
+        d = threads.deferToThread(runner.runner, argv, run_by_human=False,
+                                  stdin=StringIO(stdin),
+                                  stdout=stdout, stderr=stderr)
+        def _done(rc):
+            return rc, stdout.getvalue(), stderr.getvalue()
+        d.addCallback(_done)
+        return d
+
+class Consolidate(GridTestMixin, CLITestMixin, unittest.TestCase):
+
+    def writeto(self, path, data):
+        d = os.path.dirname(os.path.join(self.basedir, "home", path))
+        fileutil.make_dirs(d)
+        f = open(os.path.join(self.basedir, "home", path), "w")
+        f.write(data)
+        f.close()
+
+    def writeto_snapshot(self, sn, path, data):
+        p = "Backups/fluxx/Archives/2009-03-%02d 01.01.01/%s" % (sn, path)
+        return self.writeto(p, data)
+
+    def do_cli_good(self, verb, *args, **kwargs):
+        d = self.do_cli(verb, *args, **kwargs)
+        def _check((rc,out,err)):
+            self.failUnlessEqual(err, "", verb)
+            self.failUnlessEqual(rc, 0, verb)
+            return out
+        d.addCallback(_check)
+        return d
+
+    def test_arg_parsing(self):
+        self.basedir = "consolidate/Consolidate/arg_parsing"
+        self.set_up_grid(num_clients=1, num_servers=1)
+        co = debug.ConsolidateOptions()
+        co.parseOptions(["--node-directory", self.get_clientdir(),
+                         "--dbfile", "foo.db", "--backupfile", "backup", "--really",
+                         "URI:DIR2:foo"])
+        self.failUnlessEqual(co["dbfile"], "foo.db")
+        self.failUnlessEqual(co["backupfile"], "backup")
+        self.failUnless(co["really"])
+        self.failUnlessEqual(co.where, "URI:DIR2:foo")
+
+    def OFF_test_basic(self):
+        # rename this method to enable the test. I've disabled it because, in
+        # my opinion:
+        #
+        #  1: 'tahoe debug consolidate' is useful enough to include in trunk,
+        #     but not useful enough justify a lot of compatibility effort or
+        #     extra test time
+        #  2: it requires sqlite3; I did not bother to make it work with
+        #     pysqlite, nor did I bother making it fail gracefully when
+        #     sqlite3 is not available
+        #  3: this test takes 30 seconds to run on my workstation, and it likely
+        #     to take several minutes on the old slow dapper buildslave
+        #  4: I don't want other folks to see a SkipTest and wonder "oh no, what
+        #     did I do wrong to not allow this test to run"
+        #
+        # These may not be strong arguments: I welcome feedback. In particular,
+        # this command may be more suitable for a plugin of some sort, if we
+        # had plugins of some sort. -warner 12-Mar-09
+
+        self.basedir = "consolidate/Consolidate/basic"
+        self.set_up_grid(num_clients=1)
+
+        fileutil.make_dirs(os.path.join(self.basedir, "home/Backups/nonsystem"))
+        fileutil.make_dirs(os.path.join(self.basedir, "home/Backups/fluxx/Latest"))
+        self.writeto(os.path.join(self.basedir,
+                                  "home/Backups/fluxx/Archives/nondir"),
+                     "not a directory: ignore me")
+
+        # set up a number of non-shared "snapshots"
+        for i in range(1,8):
+            self.writeto_snapshot(i, "parent/README", "README")
+            self.writeto_snapshot(i, "parent/foo.txt", "foo")
+            self.writeto_snapshot(i, "parent/subdir1/bar.txt", "bar")
+            self.writeto_snapshot(i, "parent/subdir1/baz.txt", "baz")
+            self.writeto_snapshot(i, "parent/subdir2/yoy.txt", "yoy")
+            self.writeto_snapshot(i, "parent/subdir2/hola.txt", "hola")
+
+            if i >= 1:
+                pass # initial snapshot
+            if i >= 2:
+                pass # second snapshot: same as the first
+            if i >= 3:
+                # modify a file
+                self.writeto_snapshot(i, "parent/foo.txt", "FOOF!")
+            if i >= 4:
+                # foo.txt goes back to normal
+                self.writeto_snapshot(i, "parent/foo.txt", "foo")
+            if i >= 5:
+                # new file
+                self.writeto_snapshot(i, "parent/subdir1/new.txt", "new")
+            if i >= 6:
+                # copy parent/subdir1 to parent/subdir2/copy1
+                self.writeto_snapshot(i, "parent/subdir2/copy1/bar.txt", "bar")
+                self.writeto_snapshot(i, "parent/subdir2/copy1/baz.txt", "baz")
+                self.writeto_snapshot(i, "parent/subdir2/copy1/new.txt", "new")
+            if i >= 7:
+                # the last snapshot shall remain untouched
+                pass
+
+        # now copy the whole thing into tahoe
+        d = self.do_cli_good("create-alias", "tahoe")
+        d.addCallback(lambda ign:
+                      self.do_cli_good("cp", "-r",
+                                       os.path.join(self.basedir, "home/Backups"),
+                                       "tahoe:Backups"))
+        def _copied(res):
+            rootcap = get_aliases(self.get_clientdir())["tahoe"]
+            # now scan the initial directory structure
+            n = self.g.clients[0].create_node_from_uri(rootcap)
+            return n.get_child_at_path([u"Backups", u"fluxx", u"Archives"])
+        d.addCallback(_copied)
+        self.nodes = {}
+        self.caps = {}
+        def stash(node, name):
+            self.nodes[name] = node
+            self.caps[name] = node.get_uri()
+            return node
+        d.addCallback(stash, "Archives")
+        self.manifests = {}
+        def stash_manifest(manifest, which):
+            self.manifests[which] = dict(manifest)
+        d.addCallback(lambda ignored: self.build_manifest(self.nodes["Archives"]))
+        d.addCallback(stash_manifest, "start")
+        def c(n):
+            pieces = n.split("-")
+            which = "finish"
+            if len(pieces) == 3:
+                which = pieces[-1]
+            sn = int(pieces[0])
+            name = pieces[1]
+            path = [u"2009-03-%02d 01.01.01" % sn]
+            path.extend( {"b": [],
+                          "bp": [u"parent"],
+                          "bps1": [u"parent", u"subdir1"],
+                          "bps2": [u"parent", u"subdir2"],
+                          "bps2c1": [u"parent", u"subdir2", u"copy1"],
+                          }[name] )
+            return self.manifests[which][tuple(path)]
+
+        dbfile = os.path.join(self.basedir, "dirhash.db")
+        backupfile = os.path.join(self.basedir, "backup.pickle")
+
+        d.addCallback(lambda ign:
+                      self.do_cli_good("debug", "consolidate",
+                                       "--dbfile", dbfile,
+                                       "--backupfile", backupfile,
+                                       "tahoe:"))
+        def _check_consolidate_output1(out):
+            lines = out.splitlines()
+            last = lines[-1]
+            self.failUnlessEqual(last.strip(),
+                                 "system done, "
+                                 "7 dirs created, 2 used as-is, 13 reused")
+            self.failUnless(os.path.exists(dbfile))
+            self.failUnless(os.path.exists(backupfile))
+            backup = pickle.load(open(backupfile, "rb"))
+            self.failUnless(u"fluxx" in backup["systems"])
+            self.failUnless(u"fluxx" in backup["archives"])
+            adata = backup["archives"]["fluxx"]
+            kids = adata[u"children"]
+            self.failUnlessEqual(str(kids[u"2009-03-01 01.01.01"][1][u"rw_uri"]),
+                                 c("1-b-start"))
+        d.addCallback(_check_consolidate_output1)
+        d.addCallback(lambda ign:
+                      self.do_cli_good("debug", "consolidate",
+                                       "--dbfile", dbfile,
+                                       "--backupfile", backupfile,
+                                       "--really", "tahoe:"))
+        def _check_consolidate_output2(out):
+            lines = out.splitlines()
+            last = lines[-1]
+            self.failUnlessEqual(last.strip(),
+                                 "system done, "
+                                 "0 dirs created, 0 used as-is, 0 reused")
+        d.addCallback(_check_consolidate_output2)
+
+        d.addCallback(lambda ignored: self.build_manifest(self.nodes["Archives"]))
+        d.addCallback(stash_manifest, "finish")
+
+        def check_consolidation(ignored):
+            #for which in ("finish",):
+            #    for path in sorted(self.manifests[which].keys()):
+            #        print "%s %s %s" % (which, "/".join(path),
+            #                            self.manifests[which][path])
+
+            # last snapshot should be untouched
+            self.failUnlessEqual(c("7-b"), c("7-b-start"))
+
+            # first snapshot should be a readonly form of the original
+            from allmydata.scripts.tahoe_backup import readonly
+            self.failUnlessEqual(c("1-b-finish"), readonly(c("1-b-start")))
+            self.failUnlessEqual(c("1-bp-finish"), readonly(c("1-bp-start")))
+            self.failUnlessEqual(c("1-bps1-finish"), readonly(c("1-bps1-start")))
+            self.failUnlessEqual(c("1-bps2-finish"), readonly(c("1-bps2-start")))
+
+            # new directories should be different than the old ones
+            self.failIfEqual(c("1-b"), c("1-b-start"))
+            self.failIfEqual(c("1-bp"), c("1-bp-start"))
+            self.failIfEqual(c("1-bps1"), c("1-bps1-start"))
+            self.failIfEqual(c("1-bps2"), c("1-bps2-start"))
+            self.failIfEqual(c("2-b"), c("2-b-start"))
+            self.failIfEqual(c("2-bp"), c("2-bp-start"))
+            self.failIfEqual(c("2-bps1"), c("2-bps1-start"))
+            self.failIfEqual(c("2-bps2"), c("2-bps2-start"))
+            self.failIfEqual(c("3-b"), c("3-b-start"))
+            self.failIfEqual(c("3-bp"), c("3-bp-start"))
+            self.failIfEqual(c("3-bps1"), c("3-bps1-start"))
+            self.failIfEqual(c("3-bps2"), c("3-bps2-start"))
+            self.failIfEqual(c("4-b"), c("4-b-start"))
+            self.failIfEqual(c("4-bp"), c("4-bp-start"))
+            self.failIfEqual(c("4-bps1"), c("4-bps1-start"))
+            self.failIfEqual(c("4-bps2"), c("4-bps2-start"))
+            self.failIfEqual(c("5-b"), c("5-b-start"))
+            self.failIfEqual(c("5-bp"), c("5-bp-start"))
+            self.failIfEqual(c("5-bps1"), c("5-bps1-start"))
+            self.failIfEqual(c("5-bps2"), c("5-bps2-start"))
+
+            # snapshot 1 and snapshot 2 should be identical
+            self.failUnlessEqual(c("2-b"), c("1-b"))
+
+            # snapshot 3 modified a file underneath parent/
+            self.failIfEqual(c("3-b"), c("2-b")) # 3 modified a file
+            self.failIfEqual(c("3-bp"), c("2-bp"))
+            # but the subdirs are the same
+            self.failUnlessEqual(c("3-bps1"), c("2-bps1"))
+            self.failUnlessEqual(c("3-bps2"), c("2-bps2"))
+
+            # snapshot 4 should be the same as 2
+            self.failUnlessEqual(c("4-b"), c("2-b"))
+            self.failUnlessEqual(c("4-bp"), c("2-bp"))
+            self.failUnlessEqual(c("4-bps1"), c("2-bps1"))
+            self.failUnlessEqual(c("4-bps2"), c("2-bps2"))
+
+            # snapshot 5 added a file under subdir1
+            self.failIfEqual(c("5-b"), c("4-b"))
+            self.failIfEqual(c("5-bp"), c("4-bp"))
+            self.failIfEqual(c("5-bps1"), c("4-bps1"))
+            self.failUnlessEqual(c("5-bps2"), c("4-bps2"))
+
+            # snapshot 6 copied a directory-it should be shared
+            self.failIfEqual(c("6-b"), c("5-b"))
+            self.failIfEqual(c("6-bp"), c("5-bp"))
+            self.failUnlessEqual(c("6-bps1"), c("5-bps1"))
+            self.failIfEqual(c("6-bps2"), c("5-bps2"))
+            self.failUnlessEqual(c("6-bps2c1"), c("6-bps1"))
+
+        d.addCallback(check_consolidation)
+
+        return d
+
+    def build_manifest(self, root):
+        # like dirnode.build_manifest, but this one doesn't skip duplicate
+        # nodes (i.e. it is not cycle-resistant).
+        manifest = []
+        manifest.append( ( (), root.get_uri() ) )
+        d = self.manifest_of(None, root, manifest, () )
+        d.addCallback(lambda ign: manifest)
+        return d
+
+    def manifest_of(self, ignored, dirnode, manifest, path):
+        d = dirnode.list()
+        def _got_children(children):
+            d = defer.succeed(None)
+            for name, (child, metadata) in children.iteritems():
+                childpath = path + (name,)
+                manifest.append( (childpath, child.get_uri()) )
+                if IDirectoryNode.providedBy(child):
+                    d.addCallback(self.manifest_of, child, manifest, childpath)
+            return d
+        d.addCallback(_got_children)
+        return d