From: Zooko O'Whielacronx Date: Tue, 2 Aug 2011 06:37:03 +0000 (-0700) Subject: verifier: serialize the fetching of blocks within a share so that we don't use too... X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=f426e82287c11b11f5c464ff9377a86917d235aa;p=tahoe-lafs%2Ftahoe-lafs.git verifier: serialize the fetching of blocks within a share so that we don't use too much RAM Shares are still verified in parallel, but within a share, don't request a block until the previous block has been verified and the memory we used to hold it has been freed up. Patch originally due to Brian. This version has a mockery-patchery-style test which is "low tech" (it implements the patching inline in the test code instead of using an extension of the mock.patch() function from the mock library) and which unpatches in case of exception. fixes #1395 --- diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py index b43d6b15..6731e94c 100644 --- a/src/allmydata/immutable/checker.py +++ b/src/allmydata/immutable/checker.py @@ -616,14 +616,18 @@ class Checker(log.PrefixingLogMixin): # to free up the RAM return None def _get_blocks(vrbp): - ds = [] - for blocknum in range(veup.num_segments): + def _get_block(ign, blocknum): db = vrbp.get_block(blocknum) db.addCallback(_discard_result) - ds.append(db) - # this gatherResults will fire once every block of this share has - # been downloaded and verified, or else it will errback. - return deferredutil.gatherResults(ds) + return db + dbs = defer.succeed(None) + for blocknum in range(veup.num_segments): + dbs.addCallback(_get_block, blocknum) + # The Deferred we return will fire after every block of this + # share has been downloaded and verified successfully, or else it + # will errback as soon as the first error is observed. + return dbs + d.addCallback(_get_blocks) # if none of those errbacked, the blocks (and the hashes above them) diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index b302c10d..5632bff2 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -1,6 +1,7 @@ import simplejson from twisted.trial import unittest +from twisted.internet import defer from allmydata import check_results, uri from allmydata.web import check_results as web_check_results from allmydata.storage_client import StorageFarmBroker, NativeStorageServer @@ -319,3 +320,76 @@ class AddLease(GridTestMixin, unittest.TestCase): d.addCallback(lambda ign: self.failUnless(really_did_break)) return d + +class CounterHolder(object): + def __init__(self): + self._num_active_block_fetches = 0 + self._max_active_block_fetches = 0 + +from allmydata.immutable.checker import ValidatedReadBucketProxy +class MockVRBP(ValidatedReadBucketProxy): + def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder): + ValidatedReadBucketProxy.__init__(self, sharenum, bucket, + share_hash_tree, num_blocks, + block_size, share_size) + self.counterholder = counterholder + + def get_block(self, blocknum): + self.counterholder._num_active_block_fetches += 1 + if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches: + self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches + d = ValidatedReadBucketProxy.get_block(self, blocknum) + def _mark_no_longer_active(res): + self.counterholder._num_active_block_fetches -= 1 + return res + d.addBoth(_mark_no_longer_active) + return d + +class TooParallel(GridTestMixin, unittest.TestCase): + # bug #1395: immutable verifier was aggressively parallized, checking all + # blocks of all shares at the same time, blowing our memory budget and + # crashing with MemoryErrors on >1GB files. + + def test_immutable(self): + import allmydata.immutable.checker + origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy + + self.basedir = "checker/TooParallel/immutable" + + # If any code asks to instantiate a ValidatedReadBucketProxy, + # we give them a MockVRBP which is configured to use our + # CounterHolder. + counterholder = CounterHolder() + def make_mock_VRBP(*args, **kwargs): + return MockVRBP(counterholder=counterholder, *args, **kwargs) + allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP + + d = defer.succeed(None) + def _start(ign): + self.set_up_grid(num_servers=4) + self.c0 = self.g.clients[0] + self.c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1, + "happy": 4, + "n": 4, + "max_segment_size": 5, + } + self.uris = {} + DATA = "data" * 100 # 400/5 = 80 blocks + return self.c0.upload(Data(DATA, convergence="")) + d.addCallback(_start) + def _do_check(ur): + n = self.c0.create_node_from_uri(ur.uri) + return n.check(Monitor(), verify=True) + d.addCallback(_do_check) + def _check(cr): + # the verifier works on all 4 shares in parallel, but only + # fetches one block from each share at a time, so we expect to + # see 4 parallel fetches + self.failUnlessEqual(counterholder._max_active_block_fetches, 4) + d.addCallback(_check) + def _clean_up(res): + allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP + return res + d.addBoth(_clean_up) + return d + test_immutable.timeout = 10