From: Brian Warner Date: Sat, 24 Jan 2009 02:39:08 +0000 (-0700) Subject: make streaming-manifest stop doing work after the HTTP connection is dropped X-Git-Tag: allmydata-tahoe-1.3.0~145 X-Git-Url: https://git.rkrishnan.org/pf/content/en/service/somewhere?a=commitdiff_plain;h=9d6534d78b6ef2eb2769ef49cda44fcca4ba960f;p=tahoe-lafs%2Ftahoe-lafs.git make streaming-manifest stop doing work after the HTTP connection is dropped --- diff --git a/src/allmydata/web/directory.py b/src/allmydata/web/directory.py index 27f8ac9c..dbcc34c5 100644 --- a/src/allmydata/web/directory.py +++ b/src/allmydata/web/directory.py @@ -3,7 +3,9 @@ import simplejson import urllib import time +from zope.interface import implements from twisted.internet import defer +from twisted.internet.interfaces import IPushProducer from twisted.python.failure import Failure from twisted.web import http, html from nevow import url, rend, inevow, tags as T @@ -15,7 +17,7 @@ from allmydata.util import base32 from allmydata.uri import from_string_dirnode from allmydata.interfaces import IDirectoryNode, IFileNode, IMutableFileNode, \ ExistingChildError, NoSuchChildError -from allmydata.monitor import Monitor +from allmydata.monitor import Monitor, OperationCancelledError from allmydata import dirnode from allmydata.web.common import text_plain, WebError, \ IClient, IOpHandleTable, NeedOperationHandleError, \ @@ -400,7 +402,19 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): def _POST_stream_manifest(self, ctx): walker = ManifestStreamer(ctx, self.node) monitor = self.node.deep_traverse(walker) - return monitor.when_done() + walker.setMonitor(monitor) + # register to hear stopProducing. The walker ignores pauseProducing. + IRequest(ctx).registerProducer(walker, True) + d = monitor.when_done() + def _done(res): + IRequest(ctx).unregisterProducer() + return res + d.addBoth(_done) + def _cancelled(f): + f.trap(OperationCancelledError) + return "Operation Cancelled" + d.addErrback(_cancelled) + return d def _POST_set_children(self, req): replace = boolean_of_arg(get_arg(req, "replace", "true")) @@ -822,11 +836,21 @@ class DeepStatsResults(rend.Page): return simplejson.dumps(s, indent=1) class ManifestStreamer(dirnode.DeepStats): + implements(IPushProducer) def __init__(self, ctx, origin): dirnode.DeepStats.__init__(self, origin) self.req = IRequest(ctx) + def setMonitor(self, monitor): + self.monitor = monitor + def pauseProducing(self): + pass + def resumeProducing(self): + pass + def stopProducing(self): + self.monitor.cancel() + def add_node(self, node, path): dirnode.DeepStats.add_node(self, node, path) d = {"path": path,