from allmydata.util import hashutil, mathutil, base32, log
from allmydata.util.assertutil import _assert, precondition
from allmydata.util.hashutil import netstring
-from allmydata.util.limiter import ConcurrencyLimiter
from allmydata.util.netstring import split_netstring
from allmydata.uri import NewDirectoryURI, LiteralFileURI, from_string
from pycryptopp.cipher.aes import AES
directories) I can reach. Most work should be done here.
I avoid loops by keeping track of verifier-caps and refusing to call
- each() or traverse a node that I've seen before.
+ walker.add_node() or traverse a node that I've seen before. This
+ means that any file or directory will only be given to the walker
+ once. If files or directories are referenced multiple times by a
+ directory structure, this may appear to under-count or miss some of
+ them.
I return a Deferred that will fire with the value of walker.finish().
"""
# this is just a tree-walker, except that following each edge
- # requires a Deferred. We use a ConcurrencyLimiter to make sure the
- # fan-out doesn't cause problems.
+ # requires a Deferred. We used to use a ConcurrencyLimiter to limit
+ # fanout to 10 simultaneous operations, but the memory load of the
+ # queued operations was excessive (in one case, with 330k dirnodes,
+ # it caused the process to run into the 3.0GB-ish per-process 32bit
+ # linux memory limit, and crashed). So we use a single big Deferred
+ # chain, and do a strict depth-first traversal, one node at a time.
+ # This can be slower, because we aren't pipelining directory reads,
+ # but it brought the memory footprint down by roughly 50%.
monitor = Monitor()
walker.set_monitor(monitor)
found = set([self.get_verify_cap()])
- limiter = ConcurrencyLimiter(10)
- d = self._deep_traverse_dirnode(self, [],
- walker, monitor, found, limiter)
+ d = self._deep_traverse_dirnode(self, [], walker, monitor, found)
d.addCallback(lambda ignored: walker.finish())
d.addBoth(monitor.finish)
d.addErrback(lambda f: None)
return monitor
- def _deep_traverse_dirnode(self, node, path,
- walker, monitor, found, limiter):
+ def _deep_traverse_dirnode(self, node, path, walker, monitor, found):
# process this directory, then walk its children
monitor.raise_if_cancelled()
- d = limiter.add(walker.add_node, node, path)
- d.addCallback(lambda ignored: limiter.add(node.list))
+ d = defer.maybeDeferred(walker.add_node, node, path)
+ d.addCallback(lambda ignored: node.list())
d.addCallback(self._deep_traverse_dirnode_children, node, path,
- walker, monitor, found, limiter)
+ walker, monitor, found)
return d
def _deep_traverse_dirnode_children(self, children, parent, path,
- walker, monitor, found, limiter):
+ walker, monitor, found):
monitor.raise_if_cancelled()
- dl = [limiter.add(walker.enter_directory, parent, children)]
+ d = defer.maybeDeferred(walker.enter_directory, parent, children)
+ # we process file-like children first, so we can drop their FileNode
+ # objects as quickly as possible. Tests suggest that a FileNode (held
+ # in the client's nodecache) consumes about 2440 bytes. dirnodes (not
+ # in the nodecache) seem to consume about 2000 bytes.
+ dirkids = []
+ filekids = []
for name, (child, metadata) in children.iteritems():
verifier = child.get_verify_cap()
# allow LIT files (for which verifier==None) to be processed
found.add(verifier)
childpath = path + [name]
if IDirectoryNode.providedBy(child):
- dl.append(self._deep_traverse_dirnode(child, childpath,
- walker, monitor,
- found, limiter))
+ dirkids.append( (child, childpath) )
else:
- dl.append(limiter.add(walker.add_node, child, childpath))
- return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
+ filekids.append( (child, childpath) )
+ for (child, childpath) in filekids:
+ d.addCallback(lambda ignored, child=child, childpath=childpath:
+ walker.add_node(child, childpath))
+ for (child, childpath) in dirkids:
+ d.addCallback(lambda ignored, child=child, childpath=childpath:
+ self._deep_traverse_dirnode(child, childpath,
+ walker, monitor,
+ found))
+ return d
def build_manifest(self):