# to free up the RAM
return None
def _get_blocks(vrbp):
- ds = []
- for blocknum in range(veup.num_segments):
+ def _get_block(ign, blocknum):
db = vrbp.get_block(blocknum)
db.addCallback(_discard_result)
- ds.append(db)
- # this gatherResults will fire once every block of this share has
- # been downloaded and verified, or else it will errback.
- return deferredutil.gatherResults(ds)
+ return db
+ dbs = defer.succeed(None)
+ for blocknum in range(veup.num_segments):
+ dbs.addCallback(_get_block, blocknum)
+ # The Deferred we return will fire after every block of this
+ # share has been downloaded and verified successfully, or else it
+ # will errback as soon as the first error is observed.
+ return dbs
+
d.addCallback(_get_blocks)
# if none of those errbacked, the blocks (and the hashes above them)
import simplejson
from twisted.trial import unittest
+from twisted.internet import defer
from allmydata import check_results, uri
from allmydata.web import check_results as web_check_results
from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
d.addCallback(lambda ign: self.failUnless(really_did_break))
return d
+
+class CounterHolder(object):
+ def __init__(self):
+ self._num_active_block_fetches = 0
+ self._max_active_block_fetches = 0
+
+from allmydata.immutable.checker import ValidatedReadBucketProxy
+class MockVRBP(ValidatedReadBucketProxy):
+ def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder):
+ ValidatedReadBucketProxy.__init__(self, sharenum, bucket,
+ share_hash_tree, num_blocks,
+ block_size, share_size)
+ self.counterholder = counterholder
+
+ def get_block(self, blocknum):
+ self.counterholder._num_active_block_fetches += 1
+ if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches:
+ self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches
+ d = ValidatedReadBucketProxy.get_block(self, blocknum)
+ def _mark_no_longer_active(res):
+ self.counterholder._num_active_block_fetches -= 1
+ return res
+ d.addBoth(_mark_no_longer_active)
+ return d
+
+class TooParallel(GridTestMixin, unittest.TestCase):
+ # bug #1395: immutable verifier was aggressively parallized, checking all
+ # blocks of all shares at the same time, blowing our memory budget and
+ # crashing with MemoryErrors on >1GB files.
+
+ def test_immutable(self):
+ import allmydata.immutable.checker
+ origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy
+
+ self.basedir = "checker/TooParallel/immutable"
+
+ # If any code asks to instantiate a ValidatedReadBucketProxy,
+ # we give them a MockVRBP which is configured to use our
+ # CounterHolder.
+ counterholder = CounterHolder()
+ def make_mock_VRBP(*args, **kwargs):
+ return MockVRBP(counterholder=counterholder, *args, **kwargs)
+ allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP
+
+ d = defer.succeed(None)
+ def _start(ign):
+ self.set_up_grid(num_servers=4)
+ self.c0 = self.g.clients[0]
+ self.c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1,
+ "happy": 4,
+ "n": 4,
+ "max_segment_size": 5,
+ }
+ self.uris = {}
+ DATA = "data" * 100 # 400/5 = 80 blocks
+ return self.c0.upload(Data(DATA, convergence=""))
+ d.addCallback(_start)
+ def _do_check(ur):
+ n = self.c0.create_node_from_uri(ur.uri)
+ return n.check(Monitor(), verify=True)
+ d.addCallback(_do_check)
+ def _check(cr):
+ # the verifier works on all 4 shares in parallel, but only
+ # fetches one block from each share at a time, so we expect to
+ # see 4 parallel fetches
+ self.failUnlessEqual(counterholder._max_active_block_fetches, 4)
+ d.addCallback(_check)
+ def _clean_up(res):
+ allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP
+ return res
+ d.addBoth(_clean_up)
+ return d
+ test_immutable.timeout = 10