import urllib
import time
+from zope.interface import implements
from twisted.internet import defer
+from twisted.internet.interfaces import IPushProducer
from twisted.python.failure import Failure
from twisted.web import http, html
from nevow import url, rend, inevow, tags as T
from allmydata.uri import from_string_dirnode
from allmydata.interfaces import IDirectoryNode, IFileNode, IMutableFileNode, \
ExistingChildError, NoSuchChildError
-from allmydata.monitor import Monitor
+from allmydata.monitor import Monitor, OperationCancelledError
from allmydata import dirnode
from allmydata.web.common import text_plain, WebError, \
IClient, IOpHandleTable, NeedOperationHandleError, \
def _POST_stream_manifest(self, ctx):
walker = ManifestStreamer(ctx, self.node)
monitor = self.node.deep_traverse(walker)
- return monitor.when_done()
+ walker.setMonitor(monitor)
+ # register to hear stopProducing. The walker ignores pauseProducing.
+ IRequest(ctx).registerProducer(walker, True)
+ d = monitor.when_done()
+ def _done(res):
+ IRequest(ctx).unregisterProducer()
+ return res
+ d.addBoth(_done)
+ def _cancelled(f):
+ f.trap(OperationCancelledError)
+ return "Operation Cancelled"
+ d.addErrback(_cancelled)
+ return d
def _POST_set_children(self, req):
replace = boolean_of_arg(get_arg(req, "replace", "true"))
return simplejson.dumps(s, indent=1)
class ManifestStreamer(dirnode.DeepStats):
+ implements(IPushProducer)
def __init__(self, ctx, origin):
dirnode.DeepStats.__init__(self, origin)
self.req = IRequest(ctx)
+ def setMonitor(self, monitor):
+ self.monitor = monitor
+ def pauseProducing(self):
+ pass
+ def resumeProducing(self):
+ pass
+ def stopProducing(self):
+ self.monitor.cancel()
+
def add_node(self, node, path):
dirnode.DeepStats.add_node(self, node, path)
d = {"path": path,