from allmydata.monitor import Monitor
from allmydata import check_results
from allmydata.interfaces import NotEnoughSharesError
-from allmydata.immutable import repairer
+from allmydata.immutable import repairer, upload
from twisted.internet import defer
from twisted.trial import unittest
import random
+from no_network import GridTestMixin
# We'll allow you to pass this test even if you trigger eighteen times as
# many disk reads and block fetches as would be optimal.
READ_LEEWAY = 18
-DELTA_READS = 10 * READ_LEEWAY # N = 10
+MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
class Verifier(common.ShareManglingMixin, unittest.TestCase):
def test_check_without_verify(self):
d2 = self.filenode.check(Monitor(), verify=True)
def _after_check(checkresults):
after_check_reads = self._count_reads()
- self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads))
+ self.failIf(after_check_reads - before_check_reads > MAX_DELTA_READS, (after_check_reads, before_check_reads))
try:
return judgement_func(checkresults)
except Exception, le:
d.addCallback(_callb)
return d
-class Repairer(common.ShareManglingMixin, unittest.TestCase):
- def test_test_code(self):
- # The following process of stashing the shares, running
- # replace_shares, and asserting that the new set of shares equals the
- # old is more to test this test code than to test the Tahoe code...
- d = defer.succeed(None)
- d.addCallback(self.find_shares)
- stash = [None]
- def _stash_it(res):
- stash[0] = res
- return res
- d.addCallback(_stash_it)
- d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
-
- def _compare(res):
- oldshares = stash[0]
- self.failUnless(isinstance(oldshares, dict), oldshares)
- self.failUnlessEqual(oldshares, res)
+class Repairer(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
+
+ def upload_and_stash(self):
+ c0 = self.g.clients[0]
+ c1 = self.g.clients[1]
+ c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
+ d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
+ def _stash_uri(ur):
+ self.uri = ur.uri
+ self.c0_filenode = c0.create_node_from_uri(ur.uri)
+ self.c1_filenode = c1.create_node_from_uri(ur.uri)
+ d.addCallback(_stash_uri)
+ return d
- d.addCallback(self.find_shares)
+ def test_test_code(self):
+ # This test is actually to make sure our test harness works, rather
+ # than testing anything about Tahoe code itself.
+
+ self.basedir = "repairer/Repairer/test_code"
+ self.set_up_grid(num_clients=2)
+ d = self.upload_and_stash()
+
+ d.addCallback(lambda ignored: self.find_shares(self.uri))
+ def _stash_shares(oldshares):
+ self.oldshares = oldshares
+ d.addCallback(_stash_shares)
+ d.addCallback(lambda ignored: self.find_shares(self.uri))
+ def _compare(newshares):
+ self.failUnlessEqual(newshares, self.oldshares)
d.addCallback(_compare)
- d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
- d.addCallback(self.find_shares)
- d.addCallback(lambda x: self.failUnlessEqual(x, {}))
-
- # The following process of deleting 8 of the shares and asserting
- # that you can't download it is more to test this test code than to
- # test the Tahoe code...
- def _then_delete_8(unused=None):
- self.replace_shares(stash[0], storage_index=self.uri.storage_index)
- for i in range(8):
- self._delete_a_share()
- d.addCallback(_then_delete_8)
-
- def _then_download(unused=None):
- self.downloader = self.clients[1].getServiceNamed("downloader")
- d = self.downloader.download_to_data(self.uri)
-
- def _after_download_callb(result):
- self.fail() # should have gotten an errback instead
- return result
- def _after_download_errb(failure):
- failure.trap(NotEnoughSharesError)
- return None # success!
- d.addCallbacks(_after_download_callb, _after_download_errb)
- d.addCallback(_then_download)
-
- # The following process of deleting 8 of the shares and asserting
- # that you can't repair it is more to test this test code than to
- # test the Tahoe code...
- d.addCallback(_then_delete_8)
-
- def _then_repair(unused=None):
- d2 = self.filenode.check_and_repair(Monitor(), verify=False)
- def _after_repair_callb(result):
- self.fail() # should have gotten an errback instead
- return result
- def _after_repair_errb(f):
- f.trap(NotEnoughSharesError)
- return None # success!
- d2.addCallbacks(_after_repair_callb, _after_repair_errb)
- return d2
- d.addCallback(_then_repair)
+ def _delete_8(ignored):
+ shnum = self.oldshares[0][0]
+ self.delete_shares_numbered(self.uri, [shnum])
+ for sh in self.oldshares[1:8]:
+ self.delete_share(sh)
+ d.addCallback(_delete_8)
+ d.addCallback(lambda ignored: self.find_shares(self.uri))
+ d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
+
+ d.addCallback(lambda ignored:
+ self.shouldFail(NotEnoughSharesError, "then_download",
+ None,
+ self.c1_filenode.download_to_data))
+
+ repair_monitor = Monitor()
+ d.addCallback(lambda ignored:
+ self.shouldFail(NotEnoughSharesError, "then_repair",
+ None,
+ self.c1_filenode.check_and_repair,
+ repair_monitor, verify=False))
+
+ # test share corruption
+ def _test_corrupt(ignored):
+ olddata = {}
+ shares = self.find_shares(self.uri)
+ for (shnum, serverid, sharefile) in shares:
+ olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
+ for sh in shares:
+ self.corrupt_share(sh, common._corrupt_uri_extension)
+ for (shnum, serverid, sharefile) in shares:
+ newdata = open(sharefile, "rb").read()
+ self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
+ d.addCallback(_test_corrupt)
+
+ def _remove_all(shares):
+ for sh in self.find_shares(self.uri):
+ self.delete_share(sh)
+ d.addCallback(_remove_all)
+ d.addCallback(lambda ignored: self.find_shares(self.uri))
+ d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
return d
+ def failUnlessIsInstance(self, x, xtype):
+ self.failUnless(isinstance(x, xtype), x)
+
+ def _count_reads(self):
+ sum_of_read_counts = 0
+ for (i, ss, storedir) in self.iterate_servers():
+ counters = ss.stats_provider.get_stats()['counters']
+ sum_of_read_counts += counters.get('storage_server.read', 0)
+ return sum_of_read_counts
+
+ def _count_allocates(self):
+ sum_of_allocate_counts = 0
+ for (i, ss, storedir) in self.iterate_servers():
+ counters = ss.stats_provider.get_stats()['counters']
+ sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
+ return sum_of_allocate_counts
+
+ def _count_writes(self):
+ sum_of_write_counts = 0
+ for (i, ss, storedir) in self.iterate_servers():
+ counters = ss.stats_provider.get_stats()['counters']
+ sum_of_write_counts += counters.get('storage_server.write', 0)
+ return sum_of_write_counts
+
+ def _stash_counts(self):
+ self.before_repair_reads = self._count_reads()
+ self.before_repair_allocates = self._count_allocates()
+ self.before_repair_writes = self._count_writes()
+
+ def _get_delta_counts(self):
+ delta_reads = self._count_reads() - self.before_repair_reads
+ delta_allocates = self._count_allocates() - self.before_repair_allocates
+ delta_writes = self._count_writes() - self.before_repair_writes
+ return (delta_reads, delta_allocates, delta_writes)
+
+ def failIfBigger(self, x, y):
+ self.failIf(x > y, "%s > %s" % (x, y))
+
def test_repair_from_deletion_of_1(self):
""" Repair replaces a share that got deleted. """
- d = defer.succeed(None)
- d.addCallback(self._delete_a_share, sharenum=2)
-
- def _repair_from_deletion_of_1(unused):
- before_repair_reads = self._count_reads()
- before_repair_allocates = self._count_writes()
-
- d2 = self.filenode.check_and_repair(Monitor(), verify=False)
- def _after_repair(checkandrepairresults):
- assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
- prerepairres = checkandrepairresults.get_pre_repair_results()
- assert isinstance(prerepairres, check_results.CheckResults), prerepairres
- postrepairres = checkandrepairresults.get_post_repair_results()
- assert isinstance(postrepairres, check_results.CheckResults), postrepairres
- after_repair_reads = self._count_reads()
- after_repair_allocates = self._count_writes()
-
- # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
- self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
- self.failIf(after_repair_allocates - before_repair_allocates > DELTA_WRITES_PER_SHARE, (after_repair_allocates, before_repair_allocates))
- self.failIf(prerepairres.is_healthy())
- self.failUnless(postrepairres.is_healthy())
-
- # Now we inspect the filesystem to make sure that it has 10
- # shares.
- shares = self.find_shares()
- self.failIf(len(shares) < 10)
-
- # Now we assert that the verifier reports the file as healthy.
- d3 = self.filenode.check(Monitor(), verify=True)
- def _after_verify(verifyresults):
- self.failUnless(verifyresults.is_healthy())
- d3.addCallback(_after_verify)
-
- # Now we delete seven of the other shares, then try to
- # download the file and assert that it succeeds at
- # downloading and has the right contents. This can't work
- # unless it has already repaired the previously-deleted share
- # #2.
- def _then_delete_7_and_try_a_download(unused=None):
- for sharenum in range(3, 10):
- self._delete_a_share(sharenum=sharenum)
-
- return self._download_and_check_plaintext()
- d3.addCallback(_then_delete_7_and_try_a_download)
- return d3
-
- d2.addCallback(_after_repair)
- return d2
- d.addCallback(_repair_from_deletion_of_1)
+ self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
+ self.set_up_grid(num_clients=2)
+ d = self.upload_and_stash()
+
+ d.addCallback(lambda ignored:
+ self.delete_shares_numbered(self.uri, [2]))
+ d.addCallback(lambda ignored: self._stash_counts())
+ d.addCallback(lambda ignored:
+ self.c0_filenode.check_and_repair(Monitor(),
+ verify=False))
+ def _check_results(crr):
+ self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
+ pre = crr.get_pre_repair_results()
+ self.failUnlessIsInstance(pre, check_results.CheckResults)
+ post = crr.get_post_repair_results()
+ self.failUnlessIsInstance(post, check_results.CheckResults)
+ delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
+ self.failIfBigger(delta_reads, MAX_DELTA_READS)
+ self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
+ self.failIf(pre.is_healthy())
+ self.failUnless(post.is_healthy())
+
+ # Now we inspect the filesystem to make sure that it has 10
+ # shares.
+ shares = self.find_shares(self.uri)
+ self.failIf(len(shares) < 10)
+ d.addCallback(_check_results)
+
+ d.addCallback(lambda ignored:
+ self.c0_filenode.check(Monitor(), verify=True))
+ d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
+
+ # Now we delete seven of the other shares, then try to download the
+ # file and assert that it succeeds at downloading and has the right
+ # contents. This can't work unless it has already repaired the
+ # previously-deleted share #2.
+
+ d.addCallback(lambda ignored:
+ self.delete_shares_numbered(self.uri, range(3, 10+1)))
+ d.addCallback(lambda ignored: self.c1_filenode.download_to_data())
+ d.addCallback(lambda newdata:
+ self.failUnlessEqual(newdata, common.TEST_DATA))
return d
def test_repair_from_deletion_of_7(self):
""" Repair replaces seven shares that got deleted. """
- shares = self.find_shares()
- self.failIf(len(shares) != 10)
- d = defer.succeed(None)
-
- def _delete_7(unused=None):
- shnums = range(10)
- random.shuffle(shnums)
- for sharenum in shnums[:7]:
- self._delete_a_share(sharenum=sharenum)
- d.addCallback(_delete_7)
-
- def _repair_from_deletion_of_7(unused):
- before_repair_reads = self._count_reads()
- before_repair_allocates = self._count_writes()
-
- d2 = self.filenode.check_and_repair(Monitor(), verify=False)
- def _after_repair(checkandrepairresults):
- assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
- prerepairres = checkandrepairresults.get_pre_repair_results()
- assert isinstance(prerepairres, check_results.CheckResults), prerepairres
- postrepairres = checkandrepairresults.get_post_repair_results()
- assert isinstance(postrepairres, check_results.CheckResults), postrepairres
- after_repair_reads = self._count_reads()
- after_repair_allocates = self._count_writes()
-
- # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
- self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
- self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 7), (after_repair_allocates, before_repair_allocates))
- self.failIf(prerepairres.is_healthy())
- self.failUnless(postrepairres.is_healthy(), postrepairres.data)
-
- # Now we inspect the filesystem to make sure that it has 10
- # shares.
- shares = self.find_shares()
- self.failIf(len(shares) < 10)
-
- # Now we assert that the verifier reports the file as healthy.
- d3 = self.filenode.check(Monitor(), verify=True)
- def _after_verify(verifyresults):
- self.failUnless(verifyresults.is_healthy())
- d3.addCallback(_after_verify)
-
- # Now we delete seven random shares, then try to download the
- # file and assert that it succeeds at downloading and has the
- # right contents.
- def _then_delete_7_and_try_a_download(unused=None):
- for i in range(7):
- self._delete_a_share()
- return self._download_and_check_plaintext()
- d3.addCallback(_then_delete_7_and_try_a_download)
- return d3
-
- d2.addCallback(_after_repair)
- return d2
- d.addCallback(_repair_from_deletion_of_7)
+ self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
+ self.set_up_grid(num_clients=2)
+ d = self.upload_and_stash()
+ d.addCallback(lambda ignored:
+ self.delete_shares_numbered(self.uri, range(7)))
+ d.addCallback(lambda ignored: self._stash_counts())
+ d.addCallback(lambda ignored:
+ self.c0_filenode.check_and_repair(Monitor(),
+ verify=False))
+ def _check_results(crr):
+ self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
+ pre = crr.get_pre_repair_results()
+ self.failUnlessIsInstance(pre, check_results.CheckResults)
+ post = crr.get_post_repair_results()
+ self.failUnlessIsInstance(post, check_results.CheckResults)
+ delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
+
+ self.failIfBigger(delta_reads, MAX_DELTA_READS)
+ self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
+ self.failIf(pre.is_healthy())
+ self.failUnless(post.is_healthy(), post.data)
+
+ # Make sure we really have 10 shares.
+ shares = self.find_shares(self.uri)
+ self.failIf(len(shares) < 10)
+ d.addCallback(_check_results)
+
+ d.addCallback(lambda ignored:
+ self.c0_filenode.check(Monitor(), verify=True))
+ d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
+
+ # Now we delete seven of the other shares, then try to download the
+ # file and assert that it succeeds at downloading and has the right
+ # contents. This can't work unless it has already repaired the
+ # previously-deleted share #2.
+
+ d.addCallback(lambda ignored:
+ self.delete_shares_numbered(self.uri, range(3, 10+1)))
+ d.addCallback(lambda ignored: self.c1_filenode.download_to_data())
+ d.addCallback(lambda newdata:
+ self.failUnlessEqual(newdata, common.TEST_DATA))
return d
# why is test_repair_from_corruption_of_1 disabled? Read on:
# and will probably cause subsequent unrelated tests to fail too (due to
# "unclean reactor" problems).
#
+ # In addition, I (warner) have recently refactored the rest of this class
+ # to use the much-faster no_network.GridTestMixin, so this tests needs to
+ # be updated before it will be able to run again.
+ #
# So we're turning this test off until we've done one or more of the
# following:
# * remove some of these limitations
# The "* 2" in reads is because you might read a whole share
# before figuring out that it is corrupted. It might be
# possible to make this delta reads number a little tighter.
- self.failIf(after_repair_reads - before_repair_reads > (DELTA_READS * 2), (after_repair_reads, before_repair_reads))
+ self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
# The "* 2" in writes is because each server has two shares,
# and it is reasonable for repairer to conclude that there
# are two shares that it should upload, if the server fails