]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
verifier: serialize the fetching of blocks within a share so that we don't use too...
authorZooko O'Whielacronx <zooko@zooko.com>
Tue, 2 Aug 2011 06:37:03 +0000 (23:37 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Tue, 2 Aug 2011 06:37:03 +0000 (23:37 -0700)
Shares are still verified in parallel, but within a share, don't request a
block until the previous block has been verified and the memory we used to hold
it has been freed up.

Patch originally due to Brian. This version has a mockery-patchery-style test
which is "low tech" (it implements the patching inline in the test code instead
of using an extension of the mock.patch() function from the mock library) and
which unpatches in case of exception.

fixes #1395

src/allmydata/immutable/checker.py
src/allmydata/test/test_checker.py

index b43d6b154819ca56d3c64992cd53e2b29b63f558..6731e94cbfb2b623cedb44423c86e06fbcf7129c 100644 (file)
@@ -616,14 +616,18 @@ class Checker(log.PrefixingLogMixin):
             # to free up the RAM
             return None
         def _get_blocks(vrbp):
-            ds = []
-            for blocknum in range(veup.num_segments):
+            def _get_block(ign, blocknum):
                 db = vrbp.get_block(blocknum)
                 db.addCallback(_discard_result)
-                ds.append(db)
-            # this gatherResults will fire once every block of this share has
-            # been downloaded and verified, or else it will errback.
-            return deferredutil.gatherResults(ds)
+                return db
+            dbs = defer.succeed(None)
+            for blocknum in range(veup.num_segments):
+                dbs.addCallback(_get_block, blocknum)
+                # The Deferred we return will fire after every block of this
+                # share has been downloaded and verified successfully, or else it
+                # will errback as soon as the first error is observed.
+                return dbs
+
         d.addCallback(_get_blocks)
 
         # if none of those errbacked, the blocks (and the hashes above them)
index b302c10d612d8a8f6d6fa385c19fa0cbb25b8601..5632bff293d0e86a87e7e83c1f80e95e40e1583b 100644 (file)
@@ -1,6 +1,7 @@
 
 import simplejson
 from twisted.trial import unittest
+from twisted.internet import defer
 from allmydata import check_results, uri
 from allmydata.web import check_results as web_check_results
 from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
@@ -319,3 +320,76 @@ class AddLease(GridTestMixin, unittest.TestCase):
 
         d.addCallback(lambda ign: self.failUnless(really_did_break))
         return d
+
+class CounterHolder(object):
+    def __init__(self):
+        self._num_active_block_fetches = 0
+        self._max_active_block_fetches = 0
+
+from allmydata.immutable.checker import ValidatedReadBucketProxy
+class MockVRBP(ValidatedReadBucketProxy):
+    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder):
+        ValidatedReadBucketProxy.__init__(self, sharenum, bucket,
+                                          share_hash_tree, num_blocks,
+                                          block_size, share_size)
+        self.counterholder = counterholder
+
+    def get_block(self, blocknum):
+        self.counterholder._num_active_block_fetches += 1
+        if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches:
+            self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches
+        d = ValidatedReadBucketProxy.get_block(self, blocknum)
+        def _mark_no_longer_active(res):
+            self.counterholder._num_active_block_fetches -= 1
+            return res
+        d.addBoth(_mark_no_longer_active)
+        return d
+
+class TooParallel(GridTestMixin, unittest.TestCase):
+    # bug #1395: immutable verifier was aggressively parallized, checking all
+    # blocks of all shares at the same time, blowing our memory budget and
+    # crashing with MemoryErrors on >1GB files.
+
+    def test_immutable(self):
+        import allmydata.immutable.checker
+        origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy
+
+        self.basedir = "checker/TooParallel/immutable"
+
+        # If any code asks to instantiate a ValidatedReadBucketProxy,
+        # we give them a MockVRBP which is configured to use our
+        # CounterHolder.
+        counterholder = CounterHolder()
+        def make_mock_VRBP(*args, **kwargs):
+            return MockVRBP(counterholder=counterholder, *args, **kwargs)
+        allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP
+
+        d = defer.succeed(None)
+        def _start(ign):
+            self.set_up_grid(num_servers=4)
+            self.c0 = self.g.clients[0]
+            self.c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1,
+                                               "happy": 4,
+                                               "n": 4,
+                                               "max_segment_size": 5,
+                                               }
+            self.uris = {}
+            DATA = "data" * 100 # 400/5 = 80 blocks
+            return self.c0.upload(Data(DATA, convergence=""))
+        d.addCallback(_start)
+        def _do_check(ur):
+            n = self.c0.create_node_from_uri(ur.uri)
+            return n.check(Monitor(), verify=True)
+        d.addCallback(_do_check)
+        def _check(cr):
+            # the verifier works on all 4 shares in parallel, but only
+            # fetches one block from each share at a time, so we expect to
+            # see 4 parallel fetches
+            self.failUnlessEqual(counterholder._max_active_block_fetches, 4)
+        d.addCallback(_check)
+        def _clean_up(res):
+            allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP
+            return res
+        d.addBoth(_clean_up)
+        return d
+    test_immutable.timeout = 10