import simplejson
from twisted.trial import unittest
+from twisted.internet import defer
from allmydata import check_results, uri
from allmydata.web import check_results as web_check_results
from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
d.addCallback(lambda ign: self.failUnless(really_did_break))
return d
+
+class CounterHolder(object):
+ def __init__(self):
+ self._num_active_block_fetches = 0
+ self._max_active_block_fetches = 0
+
+from allmydata.immutable.checker import ValidatedReadBucketProxy
+class MockVRBP(ValidatedReadBucketProxy):
+ def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder):
+ ValidatedReadBucketProxy.__init__(self, sharenum, bucket,
+ share_hash_tree, num_blocks,
+ block_size, share_size)
+ self.counterholder = counterholder
+
+ def get_block(self, blocknum):
+ self.counterholder._num_active_block_fetches += 1
+ if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches:
+ self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches
+ d = ValidatedReadBucketProxy.get_block(self, blocknum)
+ def _mark_no_longer_active(res):
+ self.counterholder._num_active_block_fetches -= 1
+ return res
+ d.addBoth(_mark_no_longer_active)
+ return d
+
+class TooParallel(GridTestMixin, unittest.TestCase):
+ # bug #1395: immutable verifier was aggressively parallized, checking all
+ # blocks of all shares at the same time, blowing our memory budget and
+ # crashing with MemoryErrors on >1GB files.
+
+ def test_immutable(self):
+ import allmydata.immutable.checker
+ origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy
+
+ self.basedir = "checker/TooParallel/immutable"
+
+ # If any code asks to instantiate a ValidatedReadBucketProxy,
+ # we give them a MockVRBP which is configured to use our
+ # CounterHolder.
+ counterholder = CounterHolder()
+ def make_mock_VRBP(*args, **kwargs):
+ return MockVRBP(counterholder=counterholder, *args, **kwargs)
+ allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP
+
+ d = defer.succeed(None)
+ def _start(ign):
+ self.set_up_grid(num_servers=4)
+ self.c0 = self.g.clients[0]
+ self.c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1,
+ "happy": 4,
+ "n": 4,
+ "max_segment_size": 5,
+ }
+ self.uris = {}
+ DATA = "data" * 100 # 400/5 = 80 blocks
+ return self.c0.upload(Data(DATA, convergence=""))
+ d.addCallback(_start)
+ def _do_check(ur):
+ n = self.c0.create_node_from_uri(ur.uri)
+ return n.check(Monitor(), verify=True)
+ d.addCallback(_do_check)
+ def _check(cr):
+ # the verifier works on all 4 shares in parallel, but only
+ # fetches one block from each share at a time, so we expect to
+ # see 4 parallel fetches
+ self.failUnlessEqual(counterholder._max_active_block_fetches, 4)
+ d.addCallback(_check)
+ def _clean_up(res):
+ allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP
+ return res
+ d.addBoth(_clean_up)
+ return d
+ test_immutable.timeout = 10