test_repairer: change Repairer to use much-faster no_network.GridTestMixin. As a...
authorBrian Warner <warner@lothar.com>
Tue, 24 Feb 2009 00:42:27 +0000 (17:42 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 24 Feb 2009 00:42:27 +0000 (17:42 -0700)
src/allmydata/test/no_network.py
src/allmydata/test/test_repairer.py

index d75cc04abd975335fc708757fe849e375f17cb27..633accbaac394e910d57f6549a7f5a95b8197cba 100644 (file)
@@ -264,3 +264,15 @@ class GridTestMixin:
                     pass
         return sorted(shares)
 
+    def delete_share(self, (shnum, serverid, sharefile)):
+        os.unlink(sharefile)
+
+    def delete_shares_numbered(self, uri, shnums):
+        for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
+            if i_shnum in shnums:
+                os.unlink(i_sharefile)
+
+    def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function):
+        sharedata = open(sharefile, "rb").read()
+        corruptdata = corruptor_function(sharedata)
+        open(sharefile, "wb").write(corruptdata)
index e8032f238c9811c0cb66e5f4a452f75502b02866..9ff7928e3a5642d25b612ceac3158c8f59e9bf3b 100644 (file)
@@ -2,15 +2,16 @@ from allmydata.test import common
 from allmydata.monitor import Monitor
 from allmydata import check_results
 from allmydata.interfaces import NotEnoughSharesError
-from allmydata.immutable import repairer
+from allmydata.immutable import repairer, upload
 from twisted.internet import defer
 from twisted.trial import unittest
 import random
+from no_network import GridTestMixin
 
 # We'll allow you to pass this test even if you trigger eighteen times as
 # many disk reads and block fetches as would be optimal.
 READ_LEEWAY = 18
-DELTA_READS = 10 * READ_LEEWAY # N = 10
+MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
 
 class Verifier(common.ShareManglingMixin, unittest.TestCase):
     def test_check_without_verify(self):
@@ -66,7 +67,7 @@ class Verifier(common.ShareManglingMixin, unittest.TestCase):
             d2 = self.filenode.check(Monitor(), verify=True)
             def _after_check(checkresults):
                 after_check_reads = self._count_reads()
-                self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads))
+                self.failIf(after_check_reads - before_check_reads > MAX_DELTA_READS, (after_check_reads, before_check_reads))
                 try:
                     return judgement_func(checkresults)
                 except Exception, le:
@@ -431,184 +432,207 @@ class DownUpConnector(unittest.TestCase):
         d.addCallback(_callb)
         return d
 
-class Repairer(common.ShareManglingMixin, unittest.TestCase):
-    def test_test_code(self):
-        # The following process of stashing the shares, running
-        # replace_shares, and asserting that the new set of shares equals the
-        # old is more to test this test code than to test the Tahoe code...
-        d = defer.succeed(None)
-        d.addCallback(self.find_shares)
-        stash = [None]
-        def _stash_it(res):
-            stash[0] = res
-            return res
-        d.addCallback(_stash_it)
-        d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
-
-        def _compare(res):
-            oldshares = stash[0]
-            self.failUnless(isinstance(oldshares, dict), oldshares)
-            self.failUnlessEqual(oldshares, res)
+class Repairer(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
+
+    def upload_and_stash(self):
+        c0 = self.g.clients[0]
+        c1 = self.g.clients[1]
+        c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
+        d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
+        def _stash_uri(ur):
+            self.uri = ur.uri
+            self.c0_filenode = c0.create_node_from_uri(ur.uri)
+            self.c1_filenode = c1.create_node_from_uri(ur.uri)
+        d.addCallback(_stash_uri)
+        return d
 
-        d.addCallback(self.find_shares)
+    def test_test_code(self):
+        # This test is actually to make sure our test harness works, rather
+        # than testing anything about Tahoe code itself.
+
+        self.basedir = "repairer/Repairer/test_code"
+        self.set_up_grid(num_clients=2)
+        d = self.upload_and_stash()
+
+        d.addCallback(lambda ignored: self.find_shares(self.uri))
+        def _stash_shares(oldshares):
+            self.oldshares = oldshares
+        d.addCallback(_stash_shares)
+        d.addCallback(lambda ignored: self.find_shares(self.uri))
+        def _compare(newshares):
+            self.failUnlessEqual(newshares, self.oldshares)
         d.addCallback(_compare)
 
-        d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
-        d.addCallback(self.find_shares)
-        d.addCallback(lambda x: self.failUnlessEqual(x, {}))
-
-        # The following process of deleting 8 of the shares and asserting
-        # that you can't download it is more to test this test code than to
-        # test the Tahoe code...
-        def _then_delete_8(unused=None):
-            self.replace_shares(stash[0], storage_index=self.uri.storage_index)
-            for i in range(8):
-                self._delete_a_share()
-        d.addCallback(_then_delete_8)
-
-        def _then_download(unused=None):
-            self.downloader = self.clients[1].getServiceNamed("downloader")
-            d = self.downloader.download_to_data(self.uri)
-
-            def _after_download_callb(result):
-                self.fail() # should have gotten an errback instead
-                return result
-            def _after_download_errb(failure):
-                failure.trap(NotEnoughSharesError)
-                return None # success!
-            d.addCallbacks(_after_download_callb, _after_download_errb)
-        d.addCallback(_then_download)
-
-        # The following process of deleting 8 of the shares and asserting
-        # that you can't repair it is more to test this test code than to
-        # test the Tahoe code...
-        d.addCallback(_then_delete_8)
-
-        def _then_repair(unused=None):
-            d2 = self.filenode.check_and_repair(Monitor(), verify=False)
-            def _after_repair_callb(result):
-                self.fail() # should have gotten an errback instead
-                return result
-            def _after_repair_errb(f):
-                f.trap(NotEnoughSharesError)
-                return None # success!
-            d2.addCallbacks(_after_repair_callb, _after_repair_errb)
-            return d2
-        d.addCallback(_then_repair)
+        def _delete_8(ignored):
+            shnum = self.oldshares[0][0]
+            self.delete_shares_numbered(self.uri, [shnum])
+            for sh in self.oldshares[1:8]:
+                self.delete_share(sh)
+        d.addCallback(_delete_8)
+        d.addCallback(lambda ignored: self.find_shares(self.uri))
+        d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
+
+        d.addCallback(lambda ignored:
+                      self.shouldFail(NotEnoughSharesError, "then_download",
+                                      None,
+                                      self.c1_filenode.download_to_data))
+
+        repair_monitor = Monitor()
+        d.addCallback(lambda ignored:
+                      self.shouldFail(NotEnoughSharesError, "then_repair",
+                                      None,
+                                      self.c1_filenode.check_and_repair,
+                                      repair_monitor, verify=False))
+
+        # test share corruption
+        def _test_corrupt(ignored):
+            olddata = {}
+            shares = self.find_shares(self.uri)
+            for (shnum, serverid, sharefile) in shares:
+                olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
+            for sh in shares:
+                self.corrupt_share(sh, common._corrupt_uri_extension)
+            for (shnum, serverid, sharefile) in shares:
+                newdata = open(sharefile, "rb").read()
+                self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
+        d.addCallback(_test_corrupt)
+
+        def _remove_all(shares):
+            for sh in self.find_shares(self.uri):
+                self.delete_share(sh)
+        d.addCallback(_remove_all)
+        d.addCallback(lambda ignored: self.find_shares(self.uri))
+        d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
 
         return d
 
+    def failUnlessIsInstance(self, x, xtype):
+        self.failUnless(isinstance(x, xtype), x)
+
+    def _count_reads(self):
+        sum_of_read_counts = 0
+        for (i, ss, storedir) in self.iterate_servers():
+            counters = ss.stats_provider.get_stats()['counters']
+            sum_of_read_counts += counters.get('storage_server.read', 0)
+        return sum_of_read_counts
+
+    def _count_allocates(self):
+        sum_of_allocate_counts = 0
+        for (i, ss, storedir) in self.iterate_servers():
+            counters = ss.stats_provider.get_stats()['counters']
+            sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
+        return sum_of_allocate_counts
+
+    def _count_writes(self):
+        sum_of_write_counts = 0
+        for (i, ss, storedir) in self.iterate_servers():
+            counters = ss.stats_provider.get_stats()['counters']
+            sum_of_write_counts += counters.get('storage_server.write', 0)
+        return sum_of_write_counts
+
+    def _stash_counts(self):
+        self.before_repair_reads = self._count_reads()
+        self.before_repair_allocates = self._count_allocates()
+        self.before_repair_writes = self._count_writes()
+
+    def _get_delta_counts(self):
+        delta_reads = self._count_reads() - self.before_repair_reads
+        delta_allocates = self._count_allocates() - self.before_repair_allocates
+        delta_writes = self._count_writes() - self.before_repair_writes
+        return (delta_reads, delta_allocates, delta_writes)
+
+    def failIfBigger(self, x, y):
+        self.failIf(x > y, "%s > %s" % (x, y))
+
     def test_repair_from_deletion_of_1(self):
         """ Repair replaces a share that got deleted. """
-        d = defer.succeed(None)
-        d.addCallback(self._delete_a_share, sharenum=2)
-
-        def _repair_from_deletion_of_1(unused):
-            before_repair_reads = self._count_reads()
-            before_repair_allocates = self._count_writes()
-
-            d2 = self.filenode.check_and_repair(Monitor(), verify=False)
-            def _after_repair(checkandrepairresults):
-                assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
-                prerepairres = checkandrepairresults.get_pre_repair_results()
-                assert isinstance(prerepairres, check_results.CheckResults), prerepairres
-                postrepairres = checkandrepairresults.get_post_repair_results()
-                assert isinstance(postrepairres, check_results.CheckResults), postrepairres
-                after_repair_reads = self._count_reads()
-                after_repair_allocates = self._count_writes()
-
-                # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
-                self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
-                self.failIf(after_repair_allocates - before_repair_allocates > DELTA_WRITES_PER_SHARE, (after_repair_allocates, before_repair_allocates))
-                self.failIf(prerepairres.is_healthy())
-                self.failUnless(postrepairres.is_healthy())
-
-                # Now we inspect the filesystem to make sure that it has 10
-                # shares.
-                shares = self.find_shares()
-                self.failIf(len(shares) < 10)
-
-                # Now we assert that the verifier reports the file as healthy.
-                d3 = self.filenode.check(Monitor(), verify=True)
-                def _after_verify(verifyresults):
-                    self.failUnless(verifyresults.is_healthy())
-                d3.addCallback(_after_verify)
-
-                # Now we delete seven of the other shares, then try to
-                # download the file and assert that it succeeds at
-                # downloading and has the right contents. This can't work
-                # unless it has already repaired the previously-deleted share
-                # #2.
-                def _then_delete_7_and_try_a_download(unused=None):
-                    for sharenum in range(3, 10):
-                        self._delete_a_share(sharenum=sharenum)
-
-                    return self._download_and_check_plaintext()
-                d3.addCallback(_then_delete_7_and_try_a_download)
-                return d3
-
-            d2.addCallback(_after_repair)
-            return d2
-        d.addCallback(_repair_from_deletion_of_1)
+        self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
+        self.set_up_grid(num_clients=2)
+        d = self.upload_and_stash()
+
+        d.addCallback(lambda ignored:
+                      self.delete_shares_numbered(self.uri, [2]))
+        d.addCallback(lambda ignored: self._stash_counts())
+        d.addCallback(lambda ignored:
+                      self.c0_filenode.check_and_repair(Monitor(),
+                                                        verify=False))
+        def _check_results(crr):
+            self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
+            pre = crr.get_pre_repair_results()
+            self.failUnlessIsInstance(pre, check_results.CheckResults)
+            post = crr.get_post_repair_results()
+            self.failUnlessIsInstance(post, check_results.CheckResults)
+            delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
+            self.failIfBigger(delta_reads, MAX_DELTA_READS)
+            self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
+            self.failIf(pre.is_healthy())
+            self.failUnless(post.is_healthy())
+
+            # Now we inspect the filesystem to make sure that it has 10
+            # shares.
+            shares = self.find_shares(self.uri)
+            self.failIf(len(shares) < 10)
+        d.addCallback(_check_results)
+
+        d.addCallback(lambda ignored:
+                      self.c0_filenode.check(Monitor(), verify=True))
+        d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
+
+        # Now we delete seven of the other shares, then try to download the
+        # file and assert that it succeeds at downloading and has the right
+        # contents. This can't work unless it has already repaired the
+        # previously-deleted share #2.
+
+        d.addCallback(lambda ignored:
+                      self.delete_shares_numbered(self.uri, range(3, 10+1)))
+        d.addCallback(lambda ignored: self.c1_filenode.download_to_data())
+        d.addCallback(lambda newdata:
+                      self.failUnlessEqual(newdata, common.TEST_DATA))
         return d
 
     def test_repair_from_deletion_of_7(self):
         """ Repair replaces seven shares that got deleted. """
-        shares = self.find_shares()
-        self.failIf(len(shares) != 10)
-        d = defer.succeed(None)
-
-        def _delete_7(unused=None):
-            shnums = range(10)
-            random.shuffle(shnums)
-            for sharenum in shnums[:7]:
-                self._delete_a_share(sharenum=sharenum)
-        d.addCallback(_delete_7)
-
-        def _repair_from_deletion_of_7(unused):
-            before_repair_reads = self._count_reads()
-            before_repair_allocates = self._count_writes()
-
-            d2 = self.filenode.check_and_repair(Monitor(), verify=False)
-            def _after_repair(checkandrepairresults):
-                assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
-                prerepairres = checkandrepairresults.get_pre_repair_results()
-                assert isinstance(prerepairres, check_results.CheckResults), prerepairres
-                postrepairres = checkandrepairresults.get_post_repair_results()
-                assert isinstance(postrepairres, check_results.CheckResults), postrepairres
-                after_repair_reads = self._count_reads()
-                after_repair_allocates = self._count_writes()
-
-                # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
-                self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
-                self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 7), (after_repair_allocates, before_repair_allocates))
-                self.failIf(prerepairres.is_healthy())
-                self.failUnless(postrepairres.is_healthy(), postrepairres.data)
-
-                # Now we inspect the filesystem to make sure that it has 10
-                # shares.
-                shares = self.find_shares()
-                self.failIf(len(shares) < 10)
-
-                # Now we assert that the verifier reports the file as healthy.
-                d3 = self.filenode.check(Monitor(), verify=True)
-                def _after_verify(verifyresults):
-                    self.failUnless(verifyresults.is_healthy())
-                d3.addCallback(_after_verify)
-
-                # Now we delete seven random shares, then try to download the
-                # file and assert that it succeeds at downloading and has the
-                # right contents.
-                def _then_delete_7_and_try_a_download(unused=None):
-                    for i in range(7):
-                        self._delete_a_share()
-                    return self._download_and_check_plaintext()
-                d3.addCallback(_then_delete_7_and_try_a_download)
-                return d3
-
-            d2.addCallback(_after_repair)
-            return d2
-        d.addCallback(_repair_from_deletion_of_7)
+        self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
+        self.set_up_grid(num_clients=2)
+        d = self.upload_and_stash()
+        d.addCallback(lambda ignored:
+                      self.delete_shares_numbered(self.uri, range(7)))
+        d.addCallback(lambda ignored: self._stash_counts())
+        d.addCallback(lambda ignored:
+                      self.c0_filenode.check_and_repair(Monitor(),
+                                                        verify=False))
+        def _check_results(crr):
+            self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
+            pre = crr.get_pre_repair_results()
+            self.failUnlessIsInstance(pre, check_results.CheckResults)
+            post = crr.get_post_repair_results()
+            self.failUnlessIsInstance(post, check_results.CheckResults)
+            delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
+
+            self.failIfBigger(delta_reads, MAX_DELTA_READS)
+            self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
+            self.failIf(pre.is_healthy())
+            self.failUnless(post.is_healthy(), post.data)
+
+            # Make sure we really have 10 shares.
+            shares = self.find_shares(self.uri)
+            self.failIf(len(shares) < 10)
+        d.addCallback(_check_results)
+
+        d.addCallback(lambda ignored:
+                      self.c0_filenode.check(Monitor(), verify=True))
+        d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
+
+        # Now we delete seven of the other shares, then try to download the
+        # file and assert that it succeeds at downloading and has the right
+        # contents. This can't work unless it has already repaired the
+        # previously-deleted share #2.
+
+        d.addCallback(lambda ignored:
+                      self.delete_shares_numbered(self.uri, range(3, 10+1)))
+        d.addCallback(lambda ignored: self.c1_filenode.download_to_data())
+        d.addCallback(lambda newdata:
+                      self.failUnlessEqual(newdata, common.TEST_DATA))
         return d
 
     # why is test_repair_from_corruption_of_1 disabled? Read on:
@@ -638,6 +662,10 @@ class Repairer(common.ShareManglingMixin, unittest.TestCase):
     # and will probably cause subsequent unrelated tests to fail too (due to
     # "unclean reactor" problems).
     #
+    # In addition, I (warner) have recently refactored the rest of this class
+    # to use the much-faster no_network.GridTestMixin, so this tests needs to
+    # be updated before it will be able to run again.
+    #
     # So we're turning this test off until we've done one or more of the
     # following:
     #  * remove some of these limitations
@@ -672,7 +700,7 @@ class Repairer(common.ShareManglingMixin, unittest.TestCase):
                 # The "* 2" in reads is because you might read a whole share
                 # before figuring out that it is corrupted. It might be
                 # possible to make this delta reads number a little tighter.
-                self.failIf(after_repair_reads - before_repair_reads > (DELTA_READS * 2), (after_repair_reads, before_repair_reads))
+                self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
                 # The "* 2" in writes is because each server has two shares,
                 # and it is reasonable for repairer to conclude that there
                 # are two shares that it should upload, if the server fails