from allmydata.mutable.common import NotMutableError
from allmydata.mutable.node import MutableFileNode
from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
- IURI, IFileNode, IMutableFileURI, IVerifierURI, IFilesystemNode, \
+ IURI, IFileNode, IMutableFileURI, IFilesystemNode, \
ExistingChildError, ICheckable
from allmydata.checker_results import DeepCheckResults, \
DeepCheckAndRepairResults
d.addCallback(lambda child: self.delete(current_child_name))
return d
- def build_manifest(self):
- """Return a frozenset of verifier-capability strings for all nodes
- (directories and files) reachable from this one."""
+
+ def deep_traverse(self, walker):
+ """Perform a recursive walk, using this dirnode as a root, notifying
+ the 'walker' instance of everything I encounter.
+
+ I call walker.enter_directory(parent, children) once for each dirnode
+ I visit, immediately after retrieving the list of children. I pass in
+ the parent dirnode and the dict of childname->(childnode,metadata).
+ This function should *not* traverse the children: I will do that.
+ enter_directory() is most useful for the deep-stats number that
+ counts how large a directory is.
+
+ I call walker.add_node(node, path) for each node (both files and
+ directories) I can reach. Most work should be done here.
+
+ I avoid loops by keeping track of verifier-caps and refusing to call
+ each() or traverse a node that I've seen before.
+
+ I return a Deferred that will fire with the value of walker.finish().
+ """
# this is just a tree-walker, except that following each edge
# requires a Deferred. We use a ConcurrencyLimiter to make sure the
# fan-out doesn't cause problems.
- manifest = set()
- manifest.add(self.get_verifier())
- limiter = ConcurrencyLimiter(10) # allow 10 in parallel
-
- d = self._build_manifest_from_node(self, manifest, limiter)
- def _done(res):
- # LIT nodes have no verifier-capability: their data is stored
- # inside the URI itself, so there is no need to refresh anything.
- # They indicate this by returning None from their get_verifier
- # method. We need to remove any such Nones from our set. We also
- # want to convert all these caps into strings.
- return frozenset([IVerifierURI(cap).to_string()
- for cap in manifest
- if cap is not None])
- d.addCallback(_done)
+ found = set([self.get_verifier()])
+ limiter = ConcurrencyLimiter(10)
+ d = self._deep_traverse_dirnode(self, [], walker, found, limiter)
+ d.addCallback(lambda ignored: walker.finish())
return d
- def _build_manifest_from_node(self, node, manifest, limiter):
- d = limiter.add(node.list)
- def _got_list(res):
- dl = []
- for name, (child, metadata) in res.iteritems():
- verifier = child.get_verifier()
- if verifier not in manifest:
- manifest.add(verifier)
- if IDirectoryNode.providedBy(child):
- dl.append(self._build_manifest_from_node(child,
- manifest,
- limiter))
- if dl:
- return defer.DeferredList(dl)
- d.addCallback(_got_list)
+ def _deep_traverse_dirnode(self, node, path, walker, found, limiter):
+ # process this directory, then walk its children
+ d = limiter.add(walker.add_node, node, path)
+ d.addCallback(lambda ignored: node.list())
+ d.addCallback(self._deep_traverse_dirnode_children, node, path,
+ walker, found, limiter)
return d
- def deep_stats(self):
- stats = DeepStats()
- # we track verifier caps, to avoid double-counting children for which
- # we've got both a write-cap and a read-cap
- found = set()
- found.add(self.get_verifier())
+ def _deep_traverse_dirnode_children(self, children, parent, path,
+ walker, found, limiter):
+ dl = [limiter.add(walker.enter_directory, parent, children)]
+ for name, (child, metadata) in children.iteritems():
+ verifier = child.get_verifier()
+ if verifier in found:
+ continue
+ found.add(verifier)
+ childpath = path + [name]
+ if IDirectoryNode.providedBy(child):
+ dl.append(self._deep_traverse_dirnode(child, childpath,
+ walker, found, limiter))
+ else:
+ dl.append(limiter.add(walker.add_node, child, childpath))
+ return defer.DeferredList(dl, fireOnOneErrback=True)
- limiter = ConcurrencyLimiter(10)
- d = self._add_deepstats_from_node(self, found, stats, limiter)
- d.addCallback(lambda res: stats.get_results())
- return d
+ def build_manifest(self):
+ """Return a frozenset of verifier-capability strings for all nodes
+ (directories and files) reachable from this one."""
+ return self.deep_traverse(ManifestWalker())
- def _add_deepstats_from_node(self, node, found, stats, limiter):
- d = limiter.add(node.list)
- def _got_list(children):
- dl = []
- dirsize_bytes = node.get_size()
- dirsize_children = len(children)
- stats.add("count-directories")
- stats.add("size-directories", dirsize_bytes)
- stats.max("largest-directory", dirsize_bytes)
- stats.max("largest-directory-children", dirsize_children)
- for name, (child, metadata) in children.iteritems():
- verifier = child.get_verifier()
- if verifier in found:
- continue
- found.add(verifier)
- if IDirectoryNode.providedBy(child):
- dl.append(self._add_deepstats_from_node(child, found,
- stats, limiter))
- elif IMutableFileNode.providedBy(child):
- stats.add("count-files")
- stats.add("count-mutable-files")
- # TODO: update the servermap, compute a size, add it to
- # size-mutable-files, max it into "largest-mutable-file"
- elif IFileNode.providedBy(child): # CHK and LIT
- stats.add("count-files")
- size = child.get_size()
- stats.histogram("size-files-histogram", size)
- if child.get_uri().startswith("URI:LIT:"):
- stats.add("count-literal-files")
- stats.add("size-literal-files", size)
- else:
- stats.add("count-immutable-files")
- stats.add("size-immutable-files", size)
- stats.max("largest-immutable-file", size)
- if dl:
- return defer.DeferredList(dl)
- d.addCallback(_got_list)
- return d
+ def deep_stats(self):
+ # Since deep_traverse tracks verifier caps, we avoid double-counting
+ # children for which we've got both a write-cap and a read-cap
+ return self.deep_traverse(DeepStats())
def deep_check(self, verify=False):
- return self.deep_check_base(verify, False)
- def deep_check_and_repair(self, verify=False):
- return self.deep_check_base(verify, True)
-
- def deep_check_base(self, verify, repair):
- # shallow-check each object first, then traverse children
- root_si = self._node.get_storage_index()
- self._lp = log.msg(format="deep-check starting (%(si)s),"
- " verify=%(verify)s, repair=%(repair)s",
- si=base32.b2a(root_si), verify=verify, repair=repair)
- if repair:
- results = DeepCheckAndRepairResults(root_si)
- else:
- results = DeepCheckResults(root_si)
- found = set()
- limiter = ConcurrencyLimiter(10)
+ return self.deep_traverse(DeepChecker(self, verify, repair=False))
- d = self._add_deepcheck_from_node([], self, results, found, limiter,
- verify, repair)
- def _done(res):
- log.msg("deep-check done", parent=self._lp)
- return results
- d.addCallback(_done)
- return d
-
- def _add_deepcheck_from_node(self, path, node, results, found, limiter,
- verify, repair):
- verifier = node.get_verifier()
- if verifier in found:
- # avoid loops
- return None
- found.add(verifier)
-
- if repair:
- d = limiter.add(node.check_and_repair, verify)
- d.addCallback(results.add_check_and_repair, path)
- else:
- d = limiter.add(node.check, verify)
- d.addCallback(results.add_check, path)
+ def deep_check_and_repair(self, verify=False):
+ return self.deep_traverse(DeepChecker(self, verify, repair=True))
- # TODO: stats: split the DeepStats.foo calls out of
- # _add_deepstats_from_node into a separate non-recursing method, call
- # it from both here and _add_deepstats_from_node.
- if IDirectoryNode.providedBy(node):
- d.addCallback(lambda res: node.list())
- def _got_children(children):
- dl = []
- for name, (child, metadata) in children.iteritems():
- childpath = path + [name]
- d2 = self._add_deepcheck_from_node(childpath, child,
- results,
- found, limiter,
- verify, repair)
- if d2:
- dl.append(d2)
- if dl:
- return defer.DeferredList(dl, fireOnOneErrback=True)
- d.addCallback(_got_children)
- return d
+class ManifestWalker:
+ def __init__(self):
+ self.manifest = set()
+ def add_node(self, node, path):
+ v = node.get_verifier()
+ # LIT files have no verify-cap, so don't add them
+ if v:
+ assert not isinstance(v, str), "ICK: %s %s" % (v, node)
+ self.manifest.add(v.to_string())
+ def enter_directory(self, parent, children):
+ pass
+ def finish(self):
+ return frozenset(self.manifest)
class DeepStats:
self.buckets = [ (0,0), (1,3)]
self.root = math.sqrt(10)
+ def add_node(self, node, childpath):
+ if IDirectoryNode.providedBy(node):
+ self.add("count-directories")
+ elif IMutableFileNode.providedBy(node):
+ self.add("count-files")
+ self.add("count-mutable-files")
+ # TODO: update the servermap, compute a size, add it to
+ # size-mutable-files, max it into "largest-mutable-file"
+ elif IFileNode.providedBy(node): # CHK and LIT
+ self.add("count-files")
+ size = node.get_size()
+ self.histogram("size-files-histogram", size)
+ if node.get_uri().startswith("URI:LIT:"):
+ self.add("count-literal-files")
+ self.add("size-literal-files", size)
+ else:
+ self.add("count-immutable-files")
+ self.add("size-immutable-files", size)
+ self.max("largest-immutable-file", size)
+
+ def enter_directory(self, parent, children):
+ dirsize_bytes = parent.get_size()
+ dirsize_children = len(children)
+ self.add("size-directories", dirsize_bytes)
+ self.max("largest-directory", dirsize_bytes)
+ self.max("largest-directory-children", dirsize_children)
+
def add(self, key, value=1):
self.stats[key] += value
stats[key] = out
return stats
+ def finish(self):
+ return self.get_results()
+
+
+class DeepChecker:
+ def __init__(self, root, verify, repair):
+ root_si = root.get_storage_index()
+ self._lp = log.msg(format="deep-check starting (%(si)s),"
+ " verify=%(verify)s, repair=%(repair)s",
+ si=base32.b2a(root_si), verify=verify, repair=repair)
+ self._verify = verify
+ self._repair = repair
+ if repair:
+ self._results = DeepCheckAndRepairResults(root_si)
+ else:
+ self._results = DeepCheckResults(root_si)
+ self._stats = DeepStats()
+
+ def add_node(self, node, childpath):
+ if self._repair:
+ d = node.check_and_repair(self._verify)
+ d.addCallback(self._results.add_check_and_repair, childpath)
+ else:
+ d = node.check(self._verify)
+ d.addCallback(self._results.add_check, childpath)
+ d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
+ return d
+
+ def enter_directory(self, parent, children):
+ return self._stats.enter_directory(parent, children)
+
+ def finish(self):
+ log.msg("deep-check done", parent=self._lp)
+ self._results.update_stats(self._stats.get_results())
+ return self._results
+
# use client.create_dirnode() to make one of these