From: Brian Warner <>
Date: Wed, 19 Sep 2007 07:34:47 +0000 (-0700)
Subject: use producer/consumer to reduce memory usage, closes #129.
X-Git-Tag: allmydata-tahoe-0.6.0~67
X-Git-Url:;h=1340c484c6c60c524096e98ee78243c8c00a12b7;p=tahoe-lafs%2Ftahoe-lafs.git use producer/consumer to reduce memory usage, closes #129.
If the DownloadTarget is also an IConsumer, give it control of the brakes
by offering ourselves to target.registerProducer(). When they tell us to
pause, set a flag, which is checked between segment downloads and decodes. make WebDownloadTarget an IConsumer and pass control along to
the http.Request, which already knows how to be an IConsumer.
This reduces the memory footprint of stalled HTTP GETs to a bare minimum,
and thus closes #129.

diff --git a/src/allmydata/ b/src/allmydata/
index 12acd64f..8e5cd05e 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -3,7 +3,9 @@ import os, random
 from zope.interface import implements
 from twisted.python import log
 from twisted.internet import defer
+from twisted.internet.interfaces import IPushProducer, IConsumer
 from twisted.application import service
+from foolscap.eventual import eventually
 from allmydata.util import idlib, mathutil, hashutil
 from allmydata.util.assertutil import _assert
@@ -23,6 +25,9 @@ class BadPlaintextHashValue(Exception):
 class BadCrypttextHashValue(Exception):
+class DownloadStopped(Exception):
+    pass
 class Output:
     def __init__(self, downloadable, key, total_length):
         self.downloadable = downloadable
@@ -282,6 +287,7 @@ class SegmentDownloader:
 class FileDownloader:
+    implements(IPushProducer)
     check_crypttext_hash = True
     check_plaintext_hash = True
@@ -295,7 +301,12 @@ class FileDownloader:
         self._size = u.size
         self._num_needed_shares = u.needed_shares
+        if IConsumer.providedBy(downloadable):
+            downloadable.registerProducer(self, True)
+        self._downloadable = downloadable
         self._output = Output(downloadable, u.key, self._size)
+        self._paused = False
+        self._stopped = False
         self.active_buckets = {} # k: shnum, v: bucket
         self._share_buckets = [] # list of (sharenum, bucket) tuples
@@ -311,8 +322,23 @@ class FileDownloader:
                                 "crypttext_hashtree": 0,
+    def pauseProducing(self):
+        if self._paused:
+            return
+        self._paused = defer.Deferred()
+    def resumeProducing(self):
+        if self._paused:
+            p = self._paused
+            self._paused = None
+            eventually(p.callback, None)
+    def stopProducing(self):
+        log.msg("Download.stopProducing")
+        self._stopped = True
     def start(self):
-        log.msg("starting download [%s]" % idlib.b2a(self._storage_index))
+        log.msg("starting download [%s]" % idlib.b2a(self._storage_index)[:6])
         # first step: who should we download from?
         d = defer.maybeDeferred(self._get_all_shareholders)
@@ -324,6 +350,11 @@ class FileDownloader:
         # once we know that, we can download blocks from everybody
+        def _finished(res):
+            if IConsumer.providedBy(self._downloadable):
+                self._downloadable.unregisterProducer()
+            return res
+        d.addBoth(_finished)
         def _failed(why):
             return why
@@ -541,20 +572,40 @@ class FileDownloader:
         d = defer.succeed(None)
         for segnum in range(self._total_segments-1):
             d.addCallback(self._download_segment, segnum)
+            # this pause, at the end of write, prevents pre-fetch from
+            # happening until the consumer is ready for more data.
+            d.addCallback(self._check_for_pause)
         d.addCallback(self._download_tail_segment, self._total_segments-1)
         return d
+    def _check_for_pause(self, res):
+        if self._paused:
+            d = defer.Deferred()
+            self._paused.addCallback(lambda ignored: d.callback(res))
+            return d
+        if self._stopped:
+            raise DownloadStopped("our Consumer called stopProducing()")
+        return res
     def _download_segment(self, res, segnum):
+        log.msg("downloading seg#%d of %d (%d%%)"
+                % (segnum, self._total_segments,
+                   100.0 * segnum / self._total_segments))
         # memory footprint: when the SegmentDownloader finishes pulling down
         # all shares, we have 1*segment_size of usage.
         segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares)
         d = segmentdler.start()
+        # pause before using more memory
+        d.addCallback(self._check_for_pause)
         # while the codec does its job, we hit 2*segment_size
         d.addCallback(lambda (shares, shareids):
                       self._codec.decode(shares, shareids))
         # once the codec is done, we drop back to 1*segment_size, because
         # 'shares' goes out of scope. The memory usage is all in the
         # plaintext now, spread out into a bunch of tiny buffers.
+        # pause/check-for-stop just before writing, to honor stopProducing
+        d.addCallback(self._check_for_pause)
         def _done(buffers):
             # we start by joining all these buffers together into a single
             # string. This makes Output.write easier, since it wants to hash
@@ -571,10 +622,17 @@ class FileDownloader:
         return d
     def _download_tail_segment(self, res, segnum):
+        log.msg("downloading seg#%d of %d (%d%%)"
+                % (segnum, self._total_segments,
+                   100.0 * segnum / self._total_segments))
         segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares)
         d = segmentdler.start()
+        # pause before using more memory
+        d.addCallback(self._check_for_pause)
         d.addCallback(lambda (shares, shareids):
                       self._tail_codec.decode(shares, shareids))
+        # pause/check-for-stop just before writing, to honor stopProducing
+        d.addCallback(self._check_for_pause)
         def _done(buffers):
             # trim off any padding added by the upload side
             segment = "".join(buffers)
@@ -589,11 +647,8 @@ class FileDownloader:
         return d
     def _done(self, res):
+        log.msg("download done [%s]" % idlib.b2a(self._storage_index)[:6])
-        log.msg("computed CRYPTTEXT_HASH: %s" %
-                idlib.b2a(self._output.crypttext_hash))
-        log.msg("computed PLAINTEXT_HASH: %s" %
-                idlib.b2a(self._output.plaintext_hash))
         if self.check_crypttext_hash:
             _assert(self._crypttext_hash == self._output.crypttext_hash,
                     "bad crypttext_hash: computed=%s, expected=%s" %
diff --git a/src/allmydata/ b/src/allmydata/
index a1e99338..3cc6c83c 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -5,6 +5,7 @@ from twisted.application import service, strports, internet
 from twisted.web import static, resource, server, html, http
 from twisted.python import util, log
 from twisted.internet import defer
+from twisted.internet.interfaces import IConsumer
 from nevow import inevow, rend, loaders, appserver, url, tags as T
 from nevow.static import File as nevow_File # TODO: merge with static.File?
 from allmydata.util import fileutil
@@ -271,12 +272,18 @@ class Directory(rend.Page):
             return ""
 class WebDownloadTarget:
-    implements(IDownloadTarget)
+    implements(IDownloadTarget, IConsumer)
     def __init__(self, req, content_type, content_encoding):
         self._req = req
         self._content_type = content_type
         self._content_encoding = content_encoding
         self._opened = False
+        self._producer = None
+    def registerProducer(self, producer, streaming):
+        self._req.registerProducer(producer, streaming)
+    def unregisterProducer(self):
+        self._req.unregisterProducer()
     def open(self, size):
         self._opened = True