found = set([self.get_verifier()])
limiter = ConcurrencyLimiter(10)
- d = self._deep_traverse_dirnode(self, [], walker, found, limiter)
+ d = self._deep_traverse_dirnode(self, [],
+ walker, monitor, found, limiter)
d.addCallback(lambda ignored: walker.finish())
d.addBoth(monitor.finish)
+ d.addErrback(lambda f: None)
+
return monitor
- def _deep_traverse_dirnode(self, node, path, walker, found, limiter):
+ def _deep_traverse_dirnode(self, node, path,
+ walker, monitor, found, limiter):
# process this directory, then walk its children
- # TODO: check monitor.is_cancelled()
+ monitor.raise_if_cancelled()
d = limiter.add(walker.add_node, node, path)
d.addCallback(lambda ignored: limiter.add(node.list))
d.addCallback(self._deep_traverse_dirnode_children, node, path,
- walker, found, limiter)
+ walker, monitor, found, limiter)
return d
def _deep_traverse_dirnode_children(self, children, parent, path,
- walker, found, limiter):
+ walker, monitor, found, limiter):
+ monitor.raise_if_cancelled()
dl = [limiter.add(walker.enter_directory, parent, children)]
for name, (child, metadata) in children.iteritems():
verifier = child.get_verifier()
childpath = path + [name]
if IDirectoryNode.providedBy(child):
dl.append(self._deep_traverse_dirnode(child, childpath,
- walker, found, limiter))
+ walker, monitor,
+ found, limiter))
else:
dl.append(limiter.add(walker.add_node, child, childpath))
- return defer.DeferredList(dl, fireOnOneErrback=True)
+ return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
def build_manifest(self):
operation code should stop creating new work, and attempt to stop any
work already in progress."""
+ def raise_if_cancelled(self):
+ """Raise OperationCancelledError if the operation has been cancelled.
+ Operation code that has a robust error-handling path can simply call
+ this periodically."""
+
def set_status(self, status):
"""Sets the Monitor's 'status' object to an arbitrary value.
Different operations will store different sorts of status information
# get_status() is useful too, but it is operation-specific
+class OperationCancelledError(Exception):
+ pass
+
class Monitor:
implements(IMonitor)
def is_cancelled(self):
return self.cancelled
+ def raise_if_cancelled(self):
+ if self.cancelled:
+ raise OperationCancelledError()
+
def is_finished(self):
return self.finished
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
IDeepCheckAndRepairResults
+from allmydata.monitor import OperationCancelledError
from allmydata.mutable.common import NotMutableError
from allmydata.mutable import layout as mutable_layout
from foolscap import DeadReferenceError
self.root.start_deep_check_and_repair(verify=True).when_done())
d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
+ # and finally, start a deep-check, but then cancel it.
+ d.addCallback(lambda ign: self.root.start_deep_check())
+ def _checking(monitor):
+ monitor.cancel()
+ d = monitor.when_done()
+ # this should fire as soon as the next dirnode.list finishes.
+ # TODO: add a counter to measure how many list() calls are made,
+ # assert that no more than one gets to run before the cancel()
+ # takes effect.
+ def _finished_normally(res):
+ self.fail("this was supposed to fail, not finish normally")
+ def _cancelled(f):
+ f.trap(OperationCancelledError)
+ d.addCallbacks(_finished_normally, _cancelled)
+ return d
+ d.addCallback(_checking)
+
return d
def web_json(self, n, **kwargs):