From: Brian Warner Date: Wed, 22 Oct 2008 07:55:52 +0000 (-0700) Subject: dirnode.py: check for cancel during deep-traverse operations, and don't initiate... X-Git-Url: https://git.rkrishnan.org/pf/content/en/footer/COPYING.TGPPL.html?a=commitdiff_plain;h=8178b10ef18f078de6efec33293960d64943028b;p=tahoe-lafs%2Ftahoe-lafs.git dirnode.py: check for cancel during deep-traverse operations, and don't initiate any new ones if we've been cancelled. Gets us closer to #514. --- diff --git a/src/allmydata/dirnode.py b/src/allmydata/dirnode.py index a4bebcbd..f36376a6 100644 --- a/src/allmydata/dirnode.py +++ b/src/allmydata/dirnode.py @@ -477,22 +477,27 @@ class NewDirectoryNode: found = set([self.get_verifier()]) limiter = ConcurrencyLimiter(10) - d = self._deep_traverse_dirnode(self, [], walker, found, limiter) + d = self._deep_traverse_dirnode(self, [], + walker, monitor, found, limiter) d.addCallback(lambda ignored: walker.finish()) d.addBoth(monitor.finish) + d.addErrback(lambda f: None) + return monitor - def _deep_traverse_dirnode(self, node, path, walker, found, limiter): + def _deep_traverse_dirnode(self, node, path, + walker, monitor, found, limiter): # process this directory, then walk its children - # TODO: check monitor.is_cancelled() + monitor.raise_if_cancelled() d = limiter.add(walker.add_node, node, path) d.addCallback(lambda ignored: limiter.add(node.list)) d.addCallback(self._deep_traverse_dirnode_children, node, path, - walker, found, limiter) + walker, monitor, found, limiter) return d def _deep_traverse_dirnode_children(self, children, parent, path, - walker, found, limiter): + walker, monitor, found, limiter): + monitor.raise_if_cancelled() dl = [limiter.add(walker.enter_directory, parent, children)] for name, (child, metadata) in children.iteritems(): verifier = child.get_verifier() @@ -502,10 +507,11 @@ class NewDirectoryNode: childpath = path + [name] if IDirectoryNode.providedBy(child): dl.append(self._deep_traverse_dirnode(child, childpath, - walker, found, limiter)) + walker, monitor, + found, limiter)) else: dl.append(limiter.add(walker.add_node, child, childpath)) - return defer.DeferredList(dl, fireOnOneErrback=True) + return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True) def build_manifest(self): diff --git a/src/allmydata/monitor.py b/src/allmydata/monitor.py index 22de7713..c9f89d5e 100644 --- a/src/allmydata/monitor.py +++ b/src/allmydata/monitor.py @@ -32,6 +32,11 @@ class IMonitor(Interface): operation code should stop creating new work, and attempt to stop any work already in progress.""" + def raise_if_cancelled(self): + """Raise OperationCancelledError if the operation has been cancelled. + Operation code that has a robust error-handling path can simply call + this periodically.""" + def set_status(self, status): """Sets the Monitor's 'status' object to an arbitrary value. Different operations will store different sorts of status information @@ -69,6 +74,9 @@ class IMonitor(Interface): # get_status() is useful too, but it is operation-specific +class OperationCancelledError(Exception): + pass + class Monitor: implements(IMonitor) @@ -81,6 +89,10 @@ class Monitor: def is_cancelled(self): return self.cancelled + def raise_if_cancelled(self): + if self.cancelled: + raise OperationCancelledError() + def is_finished(self): return self.finished diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 9915c7cd..64524822 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -16,6 +16,7 @@ from allmydata.scripts import runner from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \ ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \ IDeepCheckAndRepairResults +from allmydata.monitor import OperationCancelledError from allmydata.mutable.common import NotMutableError from allmydata.mutable import layout as mutable_layout from foolscap import DeadReferenceError @@ -2048,6 +2049,23 @@ class DeepCheckWeb(SystemTestMixin, unittest.TestCase, WebErrorMixin): self.root.start_deep_check_and_repair(verify=True).when_done()) d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root") + # and finally, start a deep-check, but then cancel it. + d.addCallback(lambda ign: self.root.start_deep_check()) + def _checking(monitor): + monitor.cancel() + d = monitor.when_done() + # this should fire as soon as the next dirnode.list finishes. + # TODO: add a counter to measure how many list() calls are made, + # assert that no more than one gets to run before the cancel() + # takes effect. + def _finished_normally(res): + self.fail("this was supposed to fail, not finish normally") + def _cancelled(f): + f.trap(OperationCancelledError) + d.addCallbacks(_finished_normally, _cancelled) + return d + d.addCallback(_checking) + return d def web_json(self, n, **kwargs):