From: Brian Warner Date: Tue, 24 Feb 2009 00:42:27 +0000 (-0700) Subject: test_repairer: change Repairer to use much-faster no_network.GridTestMixin. As a... X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/simplejson/running.html?a=commitdiff_plain;h=2be729b1e442edd467e68a492af54b7c3b820442;p=tahoe-lafs%2Ftahoe-lafs.git test_repairer: change Repairer to use much-faster no_network.GridTestMixin. As a side-effect, fix what I think was a bug: some of the assert-minimal-effort-expended checks were mixing write counts and allocate counts --- diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index d75cc04a..633accba 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -264,3 +264,15 @@ class GridTestMixin: pass return sorted(shares) + def delete_share(self, (shnum, serverid, sharefile)): + os.unlink(sharefile) + + def delete_shares_numbered(self, uri, shnums): + for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri): + if i_shnum in shnums: + os.unlink(i_sharefile) + + def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function): + sharedata = open(sharefile, "rb").read() + corruptdata = corruptor_function(sharedata) + open(sharefile, "wb").write(corruptdata) diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py index e8032f23..9ff7928e 100644 --- a/src/allmydata/test/test_repairer.py +++ b/src/allmydata/test/test_repairer.py @@ -2,15 +2,16 @@ from allmydata.test import common from allmydata.monitor import Monitor from allmydata import check_results from allmydata.interfaces import NotEnoughSharesError -from allmydata.immutable import repairer +from allmydata.immutable import repairer, upload from twisted.internet import defer from twisted.trial import unittest import random +from no_network import GridTestMixin # We'll allow you to pass this test even if you trigger eighteen times as # many disk reads and block fetches as would be optimal. READ_LEEWAY = 18 -DELTA_READS = 10 * READ_LEEWAY # N = 10 +MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10 class Verifier(common.ShareManglingMixin, unittest.TestCase): def test_check_without_verify(self): @@ -66,7 +67,7 @@ class Verifier(common.ShareManglingMixin, unittest.TestCase): d2 = self.filenode.check(Monitor(), verify=True) def _after_check(checkresults): after_check_reads = self._count_reads() - self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads)) + self.failIf(after_check_reads - before_check_reads > MAX_DELTA_READS, (after_check_reads, before_check_reads)) try: return judgement_func(checkresults) except Exception, le: @@ -431,184 +432,207 @@ class DownUpConnector(unittest.TestCase): d.addCallback(_callb) return d -class Repairer(common.ShareManglingMixin, unittest.TestCase): - def test_test_code(self): - # The following process of stashing the shares, running - # replace_shares, and asserting that the new set of shares equals the - # old is more to test this test code than to test the Tahoe code... - d = defer.succeed(None) - d.addCallback(self.find_shares) - stash = [None] - def _stash_it(res): - stash[0] = res - return res - d.addCallback(_stash_it) - d.addCallback(self.replace_shares, storage_index=self.uri.storage_index) - - def _compare(res): - oldshares = stash[0] - self.failUnless(isinstance(oldshares, dict), oldshares) - self.failUnlessEqual(oldshares, res) +class Repairer(GridTestMixin, unittest.TestCase, common.ShouldFailMixin): + + def upload_and_stash(self): + c0 = self.g.clients[0] + c1 = self.g.clients[1] + c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12 + d = c0.upload(upload.Data(common.TEST_DATA, convergence="")) + def _stash_uri(ur): + self.uri = ur.uri + self.c0_filenode = c0.create_node_from_uri(ur.uri) + self.c1_filenode = c1.create_node_from_uri(ur.uri) + d.addCallback(_stash_uri) + return d - d.addCallback(self.find_shares) + def test_test_code(self): + # This test is actually to make sure our test harness works, rather + # than testing anything about Tahoe code itself. + + self.basedir = "repairer/Repairer/test_code" + self.set_up_grid(num_clients=2) + d = self.upload_and_stash() + + d.addCallback(lambda ignored: self.find_shares(self.uri)) + def _stash_shares(oldshares): + self.oldshares = oldshares + d.addCallback(_stash_shares) + d.addCallback(lambda ignored: self.find_shares(self.uri)) + def _compare(newshares): + self.failUnlessEqual(newshares, self.oldshares) d.addCallback(_compare) - d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index)) - d.addCallback(self.find_shares) - d.addCallback(lambda x: self.failUnlessEqual(x, {})) - - # The following process of deleting 8 of the shares and asserting - # that you can't download it is more to test this test code than to - # test the Tahoe code... - def _then_delete_8(unused=None): - self.replace_shares(stash[0], storage_index=self.uri.storage_index) - for i in range(8): - self._delete_a_share() - d.addCallback(_then_delete_8) - - def _then_download(unused=None): - self.downloader = self.clients[1].getServiceNamed("downloader") - d = self.downloader.download_to_data(self.uri) - - def _after_download_callb(result): - self.fail() # should have gotten an errback instead - return result - def _after_download_errb(failure): - failure.trap(NotEnoughSharesError) - return None # success! - d.addCallbacks(_after_download_callb, _after_download_errb) - d.addCallback(_then_download) - - # The following process of deleting 8 of the shares and asserting - # that you can't repair it is more to test this test code than to - # test the Tahoe code... - d.addCallback(_then_delete_8) - - def _then_repair(unused=None): - d2 = self.filenode.check_and_repair(Monitor(), verify=False) - def _after_repair_callb(result): - self.fail() # should have gotten an errback instead - return result - def _after_repair_errb(f): - f.trap(NotEnoughSharesError) - return None # success! - d2.addCallbacks(_after_repair_callb, _after_repair_errb) - return d2 - d.addCallback(_then_repair) + def _delete_8(ignored): + shnum = self.oldshares[0][0] + self.delete_shares_numbered(self.uri, [shnum]) + for sh in self.oldshares[1:8]: + self.delete_share(sh) + d.addCallback(_delete_8) + d.addCallback(lambda ignored: self.find_shares(self.uri)) + d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2)) + + d.addCallback(lambda ignored: + self.shouldFail(NotEnoughSharesError, "then_download", + None, + self.c1_filenode.download_to_data)) + + repair_monitor = Monitor() + d.addCallback(lambda ignored: + self.shouldFail(NotEnoughSharesError, "then_repair", + None, + self.c1_filenode.check_and_repair, + repair_monitor, verify=False)) + + # test share corruption + def _test_corrupt(ignored): + olddata = {} + shares = self.find_shares(self.uri) + for (shnum, serverid, sharefile) in shares: + olddata[ (shnum, serverid) ] = open(sharefile, "rb").read() + for sh in shares: + self.corrupt_share(sh, common._corrupt_uri_extension) + for (shnum, serverid, sharefile) in shares: + newdata = open(sharefile, "rb").read() + self.failIfEqual(olddata[ (shnum, serverid) ], newdata) + d.addCallback(_test_corrupt) + + def _remove_all(shares): + for sh in self.find_shares(self.uri): + self.delete_share(sh) + d.addCallback(_remove_all) + d.addCallback(lambda ignored: self.find_shares(self.uri)) + d.addCallback(lambda shares: self.failUnlessEqual(shares, [])) return d + def failUnlessIsInstance(self, x, xtype): + self.failUnless(isinstance(x, xtype), x) + + def _count_reads(self): + sum_of_read_counts = 0 + for (i, ss, storedir) in self.iterate_servers(): + counters = ss.stats_provider.get_stats()['counters'] + sum_of_read_counts += counters.get('storage_server.read', 0) + return sum_of_read_counts + + def _count_allocates(self): + sum_of_allocate_counts = 0 + for (i, ss, storedir) in self.iterate_servers(): + counters = ss.stats_provider.get_stats()['counters'] + sum_of_allocate_counts += counters.get('storage_server.allocate', 0) + return sum_of_allocate_counts + + def _count_writes(self): + sum_of_write_counts = 0 + for (i, ss, storedir) in self.iterate_servers(): + counters = ss.stats_provider.get_stats()['counters'] + sum_of_write_counts += counters.get('storage_server.write', 0) + return sum_of_write_counts + + def _stash_counts(self): + self.before_repair_reads = self._count_reads() + self.before_repair_allocates = self._count_allocates() + self.before_repair_writes = self._count_writes() + + def _get_delta_counts(self): + delta_reads = self._count_reads() - self.before_repair_reads + delta_allocates = self._count_allocates() - self.before_repair_allocates + delta_writes = self._count_writes() - self.before_repair_writes + return (delta_reads, delta_allocates, delta_writes) + + def failIfBigger(self, x, y): + self.failIf(x > y, "%s > %s" % (x, y)) + def test_repair_from_deletion_of_1(self): """ Repair replaces a share that got deleted. """ - d = defer.succeed(None) - d.addCallback(self._delete_a_share, sharenum=2) - - def _repair_from_deletion_of_1(unused): - before_repair_reads = self._count_reads() - before_repair_allocates = self._count_writes() - - d2 = self.filenode.check_and_repair(Monitor(), verify=False) - def _after_repair(checkandrepairresults): - assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults - prerepairres = checkandrepairresults.get_pre_repair_results() - assert isinstance(prerepairres, check_results.CheckResults), prerepairres - postrepairres = checkandrepairresults.get_post_repair_results() - assert isinstance(postrepairres, check_results.CheckResults), postrepairres - after_repair_reads = self._count_reads() - after_repair_allocates = self._count_writes() - - # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates - self.failIf(after_repair_reads - before_repair_reads > DELTA_READS) - self.failIf(after_repair_allocates - before_repair_allocates > DELTA_WRITES_PER_SHARE, (after_repair_allocates, before_repair_allocates)) - self.failIf(prerepairres.is_healthy()) - self.failUnless(postrepairres.is_healthy()) - - # Now we inspect the filesystem to make sure that it has 10 - # shares. - shares = self.find_shares() - self.failIf(len(shares) < 10) - - # Now we assert that the verifier reports the file as healthy. - d3 = self.filenode.check(Monitor(), verify=True) - def _after_verify(verifyresults): - self.failUnless(verifyresults.is_healthy()) - d3.addCallback(_after_verify) - - # Now we delete seven of the other shares, then try to - # download the file and assert that it succeeds at - # downloading and has the right contents. This can't work - # unless it has already repaired the previously-deleted share - # #2. - def _then_delete_7_and_try_a_download(unused=None): - for sharenum in range(3, 10): - self._delete_a_share(sharenum=sharenum) - - return self._download_and_check_plaintext() - d3.addCallback(_then_delete_7_and_try_a_download) - return d3 - - d2.addCallback(_after_repair) - return d2 - d.addCallback(_repair_from_deletion_of_1) + self.basedir = "repairer/Repairer/repair_from_deletion_of_1" + self.set_up_grid(num_clients=2) + d = self.upload_and_stash() + + d.addCallback(lambda ignored: + self.delete_shares_numbered(self.uri, [2])) + d.addCallback(lambda ignored: self._stash_counts()) + d.addCallback(lambda ignored: + self.c0_filenode.check_and_repair(Monitor(), + verify=False)) + def _check_results(crr): + self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults) + pre = crr.get_pre_repair_results() + self.failUnlessIsInstance(pre, check_results.CheckResults) + post = crr.get_post_repair_results() + self.failUnlessIsInstance(post, check_results.CheckResults) + delta_reads, delta_allocates, delta_writes = self._get_delta_counts() + self.failIfBigger(delta_reads, MAX_DELTA_READS) + self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE) + self.failIf(pre.is_healthy()) + self.failUnless(post.is_healthy()) + + # Now we inspect the filesystem to make sure that it has 10 + # shares. + shares = self.find_shares(self.uri) + self.failIf(len(shares) < 10) + d.addCallback(_check_results) + + d.addCallback(lambda ignored: + self.c0_filenode.check(Monitor(), verify=True)) + d.addCallback(lambda vr: self.failUnless(vr.is_healthy())) + + # Now we delete seven of the other shares, then try to download the + # file and assert that it succeeds at downloading and has the right + # contents. This can't work unless it has already repaired the + # previously-deleted share #2. + + d.addCallback(lambda ignored: + self.delete_shares_numbered(self.uri, range(3, 10+1))) + d.addCallback(lambda ignored: self.c1_filenode.download_to_data()) + d.addCallback(lambda newdata: + self.failUnlessEqual(newdata, common.TEST_DATA)) return d def test_repair_from_deletion_of_7(self): """ Repair replaces seven shares that got deleted. """ - shares = self.find_shares() - self.failIf(len(shares) != 10) - d = defer.succeed(None) - - def _delete_7(unused=None): - shnums = range(10) - random.shuffle(shnums) - for sharenum in shnums[:7]: - self._delete_a_share(sharenum=sharenum) - d.addCallback(_delete_7) - - def _repair_from_deletion_of_7(unused): - before_repair_reads = self._count_reads() - before_repair_allocates = self._count_writes() - - d2 = self.filenode.check_and_repair(Monitor(), verify=False) - def _after_repair(checkandrepairresults): - assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults - prerepairres = checkandrepairresults.get_pre_repair_results() - assert isinstance(prerepairres, check_results.CheckResults), prerepairres - postrepairres = checkandrepairresults.get_post_repair_results() - assert isinstance(postrepairres, check_results.CheckResults), postrepairres - after_repair_reads = self._count_reads() - after_repair_allocates = self._count_writes() - - # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates - self.failIf(after_repair_reads - before_repair_reads > DELTA_READS) - self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 7), (after_repair_allocates, before_repair_allocates)) - self.failIf(prerepairres.is_healthy()) - self.failUnless(postrepairres.is_healthy(), postrepairres.data) - - # Now we inspect the filesystem to make sure that it has 10 - # shares. - shares = self.find_shares() - self.failIf(len(shares) < 10) - - # Now we assert that the verifier reports the file as healthy. - d3 = self.filenode.check(Monitor(), verify=True) - def _after_verify(verifyresults): - self.failUnless(verifyresults.is_healthy()) - d3.addCallback(_after_verify) - - # Now we delete seven random shares, then try to download the - # file and assert that it succeeds at downloading and has the - # right contents. - def _then_delete_7_and_try_a_download(unused=None): - for i in range(7): - self._delete_a_share() - return self._download_and_check_plaintext() - d3.addCallback(_then_delete_7_and_try_a_download) - return d3 - - d2.addCallback(_after_repair) - return d2 - d.addCallback(_repair_from_deletion_of_7) + self.basedir = "repairer/Repairer/repair_from_deletion_of_1" + self.set_up_grid(num_clients=2) + d = self.upload_and_stash() + d.addCallback(lambda ignored: + self.delete_shares_numbered(self.uri, range(7))) + d.addCallback(lambda ignored: self._stash_counts()) + d.addCallback(lambda ignored: + self.c0_filenode.check_and_repair(Monitor(), + verify=False)) + def _check_results(crr): + self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults) + pre = crr.get_pre_repair_results() + self.failUnlessIsInstance(pre, check_results.CheckResults) + post = crr.get_post_repair_results() + self.failUnlessIsInstance(post, check_results.CheckResults) + delta_reads, delta_allocates, delta_writes = self._get_delta_counts() + + self.failIfBigger(delta_reads, MAX_DELTA_READS) + self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7)) + self.failIf(pre.is_healthy()) + self.failUnless(post.is_healthy(), post.data) + + # Make sure we really have 10 shares. + shares = self.find_shares(self.uri) + self.failIf(len(shares) < 10) + d.addCallback(_check_results) + + d.addCallback(lambda ignored: + self.c0_filenode.check(Monitor(), verify=True)) + d.addCallback(lambda vr: self.failUnless(vr.is_healthy())) + + # Now we delete seven of the other shares, then try to download the + # file and assert that it succeeds at downloading and has the right + # contents. This can't work unless it has already repaired the + # previously-deleted share #2. + + d.addCallback(lambda ignored: + self.delete_shares_numbered(self.uri, range(3, 10+1))) + d.addCallback(lambda ignored: self.c1_filenode.download_to_data()) + d.addCallback(lambda newdata: + self.failUnlessEqual(newdata, common.TEST_DATA)) return d # why is test_repair_from_corruption_of_1 disabled? Read on: @@ -638,6 +662,10 @@ class Repairer(common.ShareManglingMixin, unittest.TestCase): # and will probably cause subsequent unrelated tests to fail too (due to # "unclean reactor" problems). # + # In addition, I (warner) have recently refactored the rest of this class + # to use the much-faster no_network.GridTestMixin, so this tests needs to + # be updated before it will be able to run again. + # # So we're turning this test off until we've done one or more of the # following: # * remove some of these limitations @@ -672,7 +700,7 @@ class Repairer(common.ShareManglingMixin, unittest.TestCase): # The "* 2" in reads is because you might read a whole share # before figuring out that it is corrupted. It might be # possible to make this delta reads number a little tighter. - self.failIf(after_repair_reads - before_repair_reads > (DELTA_READS * 2), (after_repair_reads, before_repair_reads)) + self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads)) # The "* 2" in writes is because each server has two shares, # and it is reasonable for repairer to conclude that there # are two shares that it should upload, if the server fails