]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
make streaming-manifest stop doing work after the HTTP connection is dropped
authorBrian Warner <warner@allmydata.com>
Sat, 24 Jan 2009 02:39:08 +0000 (19:39 -0700)
committerBrian Warner <warner@allmydata.com>
Sat, 24 Jan 2009 02:39:08 +0000 (19:39 -0700)
src/allmydata/web/directory.py

index 27f8ac9c53144dcdeaba221ac0ee9733db651a47..dbcc34c54c1a53ce1dfb3c6c6de0cf54c6f70d12 100644 (file)
@@ -3,7 +3,9 @@ import simplejson
 import urllib
 import time
 
+from zope.interface import implements
 from twisted.internet import defer
+from twisted.internet.interfaces import IPushProducer
 from twisted.python.failure import Failure
 from twisted.web import http, html
 from nevow import url, rend, inevow, tags as T
@@ -15,7 +17,7 @@ from allmydata.util import base32
 from allmydata.uri import from_string_dirnode
 from allmydata.interfaces import IDirectoryNode, IFileNode, IMutableFileNode, \
      ExistingChildError, NoSuchChildError
-from allmydata.monitor import Monitor
+from allmydata.monitor import Monitor, OperationCancelledError
 from allmydata import dirnode
 from allmydata.web.common import text_plain, WebError, \
      IClient, IOpHandleTable, NeedOperationHandleError, \
@@ -400,7 +402,19 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
     def _POST_stream_manifest(self, ctx):
         walker = ManifestStreamer(ctx, self.node)
         monitor = self.node.deep_traverse(walker)
-        return monitor.when_done()
+        walker.setMonitor(monitor)
+        # register to hear stopProducing. The walker ignores pauseProducing.
+        IRequest(ctx).registerProducer(walker, True)
+        d = monitor.when_done()
+        def _done(res):
+            IRequest(ctx).unregisterProducer()
+            return res
+        d.addBoth(_done)
+        def _cancelled(f):
+            f.trap(OperationCancelledError)
+            return "Operation Cancelled"
+        d.addErrback(_cancelled)
+        return d
 
     def _POST_set_children(self, req):
         replace = boolean_of_arg(get_arg(req, "replace", "true"))
@@ -822,11 +836,21 @@ class DeepStatsResults(rend.Page):
         return simplejson.dumps(s, indent=1)
 
 class ManifestStreamer(dirnode.DeepStats):
+    implements(IPushProducer)
 
     def __init__(self, ctx, origin):
         dirnode.DeepStats.__init__(self, origin)
         self.req = IRequest(ctx)
 
+    def setMonitor(self, monitor):
+        self.monitor = monitor
+    def pauseProducing(self):
+        pass
+    def resumeProducing(self):
+        pass
+    def stopProducing(self):
+        self.monitor.cancel()
+
     def add_node(self, node, path):
         dirnode.DeepStats.add_node(self, node, path)
         d = {"path": path,