From f426e82287c11b11f5c464ff9377a86917d235aa Mon Sep 17 00:00:00 2001
From: Zooko O'Whielacronx <zooko@zooko.com>
Date: Mon, 1 Aug 2011 23:37:03 -0700
Subject: [PATCH] verifier: serialize the fetching of blocks within a share so
 that we don't use too much RAM

Shares are still verified in parallel, but within a share, don't request a
block until the previous block has been verified and the memory we used to hold
it has been freed up.

Patch originally due to Brian. This version has a mockery-patchery-style test
which is "low tech" (it implements the patching inline in the test code instead
of using an extension of the mock.patch() function from the mock library) and
which unpatches in case of exception.

fixes #1395
---
 src/allmydata/immutable/checker.py | 16 ++++---
 src/allmydata/test/test_checker.py | 74 ++++++++++++++++++++++++++++++
 2 files changed, 84 insertions(+), 6 deletions(-)

diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py
index b43d6b15..6731e94c 100644
--- a/src/allmydata/immutable/checker.py
+++ b/src/allmydata/immutable/checker.py
@@ -616,14 +616,18 @@ class Checker(log.PrefixingLogMixin):
             # to free up the RAM
             return None
         def _get_blocks(vrbp):
-            ds = []
-            for blocknum in range(veup.num_segments):
+            def _get_block(ign, blocknum):
                 db = vrbp.get_block(blocknum)
                 db.addCallback(_discard_result)
-                ds.append(db)
-            # this gatherResults will fire once every block of this share has
-            # been downloaded and verified, or else it will errback.
-            return deferredutil.gatherResults(ds)
+                return db
+            dbs = defer.succeed(None)
+            for blocknum in range(veup.num_segments):
+                dbs.addCallback(_get_block, blocknum)
+                # The Deferred we return will fire after every block of this
+                # share has been downloaded and verified successfully, or else it
+                # will errback as soon as the first error is observed.
+                return dbs
+
         d.addCallback(_get_blocks)
 
         # if none of those errbacked, the blocks (and the hashes above them)
diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py
index b302c10d..5632bff2 100644
--- a/src/allmydata/test/test_checker.py
+++ b/src/allmydata/test/test_checker.py
@@ -1,6 +1,7 @@
 
 import simplejson
 from twisted.trial import unittest
+from twisted.internet import defer
 from allmydata import check_results, uri
 from allmydata.web import check_results as web_check_results
 from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
@@ -319,3 +320,76 @@ class AddLease(GridTestMixin, unittest.TestCase):
 
         d.addCallback(lambda ign: self.failUnless(really_did_break))
         return d
+
+class CounterHolder(object):
+    def __init__(self):
+        self._num_active_block_fetches = 0
+        self._max_active_block_fetches = 0
+
+from allmydata.immutable.checker import ValidatedReadBucketProxy
+class MockVRBP(ValidatedReadBucketProxy):
+    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder):
+        ValidatedReadBucketProxy.__init__(self, sharenum, bucket,
+                                          share_hash_tree, num_blocks,
+                                          block_size, share_size)
+        self.counterholder = counterholder
+
+    def get_block(self, blocknum):
+        self.counterholder._num_active_block_fetches += 1
+        if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches:
+            self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches
+        d = ValidatedReadBucketProxy.get_block(self, blocknum)
+        def _mark_no_longer_active(res):
+            self.counterholder._num_active_block_fetches -= 1
+            return res
+        d.addBoth(_mark_no_longer_active)
+        return d
+
+class TooParallel(GridTestMixin, unittest.TestCase):
+    # bug #1395: immutable verifier was aggressively parallized, checking all
+    # blocks of all shares at the same time, blowing our memory budget and
+    # crashing with MemoryErrors on >1GB files.
+
+    def test_immutable(self):
+        import allmydata.immutable.checker
+        origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy
+
+        self.basedir = "checker/TooParallel/immutable"
+
+        # If any code asks to instantiate a ValidatedReadBucketProxy,
+        # we give them a MockVRBP which is configured to use our
+        # CounterHolder.
+        counterholder = CounterHolder()
+        def make_mock_VRBP(*args, **kwargs):
+            return MockVRBP(counterholder=counterholder, *args, **kwargs)
+        allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP
+
+        d = defer.succeed(None)
+        def _start(ign):
+            self.set_up_grid(num_servers=4)
+            self.c0 = self.g.clients[0]
+            self.c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1,
+                                               "happy": 4,
+                                               "n": 4,
+                                               "max_segment_size": 5,
+                                               }
+            self.uris = {}
+            DATA = "data" * 100 # 400/5 = 80 blocks
+            return self.c0.upload(Data(DATA, convergence=""))
+        d.addCallback(_start)
+        def _do_check(ur):
+            n = self.c0.create_node_from_uri(ur.uri)
+            return n.check(Monitor(), verify=True)
+        d.addCallback(_do_check)
+        def _check(cr):
+            # the verifier works on all 4 shares in parallel, but only
+            # fetches one block from each share at a time, so we expect to
+            # see 4 parallel fetches
+            self.failUnlessEqual(counterholder._max_active_block_fetches, 4)
+        d.addCallback(_check)
+        def _clean_up(res):
+            allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP
+            return res
+        d.addBoth(_clean_up)
+        return d
+    test_immutable.timeout = 10
-- 
2.45.2