From: Zooko O'Whielacronx Date: Fri, 2 Jan 2009 23:49:41 +0000 (-0700) Subject: trivial: a few improvements to in-line doc and code, and renaming of test/test_immuta... X-Git-Url: https://git.rkrishnan.org/%5B/index.php?a=commitdiff_plain;h=cc70c163baa4669e9666c8891f6e2c8c23d1b3d1;p=tahoe-lafs%2Ftahoe-lafs.git trivial: a few improvements to in-line doc and code, and renaming of test/test_immutable_checker.py to test/test_immutable.py That file currently tests checker and verifier and repairer, and will soon also test downloader. --- diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index daf9052c..cae5864b 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -802,7 +802,7 @@ class FileDownloader(log.PrefixingLogMixin): self._results.timings["peer_selection"] = now - self._started if len(self._share_buckets) < self._uri.needed_shares: - raise NotEnoughSharesError + raise NotEnoughSharesError(len(self._share_buckets), self._uri.needed_shares) #for s in self._share_vbuckets.values(): # for vb in s: diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index e89eb512..d6aef394 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -344,6 +344,9 @@ class BucketReader(Referenceable): self.storage_index = storage_index self.shnum = shnum + def __repr__(self): + return "<%s %s %s>" % (self.__class__.__name__, base32.b2a_l(self.storage_index[:8], 60), self.shnum) + def remote_read(self, offset, length): start = time.time() data = self._share_file.read_share_data(offset, length) diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py new file mode 100644 index 00000000..5614a54a --- /dev/null +++ b/src/allmydata/test/test_immutable.py @@ -0,0 +1,680 @@ + +from allmydata.test.common import SystemTestMixin, ShareManglingMixin +from allmydata.monitor import Monitor +from allmydata.interfaces import IURI, NotEnoughSharesError +from allmydata.immutable import upload +from allmydata.util import log +from twisted.internet import defer +from twisted.trial import unittest +import random, struct +import common_util as testutil + +TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1) + +def corrupt_field(data, offset, size, debug=False): + if random.random() < 0.5: + newdata = testutil.flip_one_bit(data, offset, size) + if debug: + log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size])) + return newdata + else: + newval = testutil.insecurerandstr(size) + if debug: + log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval)) + return data[:offset]+newval+data[offset+size:] + +def _corrupt_file_version_number(data): + """ Scramble the file data -- the share file version number have one bit flipped or else + will be changed to a random value.""" + return corrupt_field(data, 0x00, 4) + +def _corrupt_size_of_file_data(data): + """ Scramble the file data -- the field showing the size of the share data within the file + will be set to one smaller. """ + return corrupt_field(data, 0x04, 4) + +def _corrupt_sharedata_version_number(data): + """ Scramble the file data -- the share data version number will have one bit flipped or + else will be changed to a random value, but not 1 or 2.""" + return corrupt_field(data, 0x0c, 4) + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + newsharevernum = sharevernum + while newsharevernum in (1, 2): + newsharevernum = random.randrange(0, 2**32) + newsharevernumbytes = struct.pack(">l", newsharevernum) + return data[:0x0c] + newsharevernumbytes + data[0x0c+4:] + +def _corrupt_sharedata_version_number_to_known_version(data): + """ Scramble the file data -- the share data version number will + be changed to 2 if it is 1 or else to 1 if it is 2.""" + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + newsharevernum = 2 + else: + newsharevernum = 1 + newsharevernumbytes = struct.pack(">l", newsharevernum) + return data[:0x0c] + newsharevernumbytes + data[0x0c+4:] + +def _corrupt_segment_size(data): + """ Scramble the file data -- the field showing the size of the segment will have one + bit flipped or else be changed to a random value. """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + return corrupt_field(data, 0x0c+0x04, 4, debug=True) + else: + return corrupt_field(data, 0x0c+0x04, 8, debug=True) + +def _corrupt_size_of_sharedata(data): + """ Scramble the file data -- the field showing the size of the data within the share + data will have one bit flipped or else will be changed to a random value. """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + return corrupt_field(data, 0x0c+0x08, 4) + else: + return corrupt_field(data, 0x0c+0x0c, 8) + +def _corrupt_offset_of_sharedata(data): + """ Scramble the file data -- the field showing the offset of the data within the share + data will have one bit flipped or else be changed to a random value. """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + return corrupt_field(data, 0x0c+0x0c, 4) + else: + return corrupt_field(data, 0x0c+0x14, 8) + +def _corrupt_offset_of_ciphertext_hash_tree(data): + """ Scramble the file data -- the field showing the offset of the ciphertext hash tree + within the share data will have one bit flipped or else be changed to a random value. + """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + return corrupt_field(data, 0x0c+0x14, 4, debug=True) + else: + return corrupt_field(data, 0x0c+0x24, 8, debug=True) + +def _corrupt_offset_of_block_hashes(data): + """ Scramble the file data -- the field showing the offset of the block hash tree within + the share data will have one bit flipped or else will be changed to a random value. """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + return corrupt_field(data, 0x0c+0x18, 4) + else: + return corrupt_field(data, 0x0c+0x2c, 8) + +def _corrupt_offset_of_share_hashes(data): + """ Scramble the file data -- the field showing the offset of the share hash tree within + the share data will have one bit flipped or else will be changed to a random value. """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + return corrupt_field(data, 0x0c+0x1c, 4) + else: + return corrupt_field(data, 0x0c+0x34, 8) + +def _corrupt_offset_of_uri_extension(data): + """ Scramble the file data -- the field showing the offset of the uri extension will + have one bit flipped or else will be changed to a random value. """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + return corrupt_field(data, 0x0c+0x20, 4) + else: + return corrupt_field(data, 0x0c+0x3c, 8) + +def _corrupt_share_data(data): + """ Scramble the file data -- the field containing the share data itself will have one + bit flipped or else will be changed to a random value. """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0] + + return corrupt_field(data, 0x0c+0x24, sharedatasize) + else: + sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0] + + return corrupt_field(data, 0x0c+0x44, sharedatasize) + +def _corrupt_crypttext_hash_tree(data): + """ Scramble the file data -- the field containing the crypttext hash tree will have one + bit flipped or else will be changed to a random value. + """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0] + blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0] + else: + crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0] + blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0] + + return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset) + +def _corrupt_block_hashes(data): + """ Scramble the file data -- the field containing the block hash tree will have one bit + flipped or else will be changed to a random value. + """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0] + sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0] + else: + blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0] + sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0] + + return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset) + +def _corrupt_share_hashes(data): + """ Scramble the file data -- the field containing the share hash chain will have one + bit flipped or else will be changed to a random value. + """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0] + uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0] + else: + sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0] + uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0] + + return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset) + +def _corrupt_length_of_uri_extension(data): + """ Scramble the file data -- the field showing the length of the uri extension will + have one bit flipped or else will be changed to a random value. """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0] + return corrupt_field(data, uriextoffset, 4) + else: + uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0] + return corrupt_field(data, uriextoffset, 8) + +def _corrupt_uri_extension(data): + """ Scramble the file data -- the field containing the uri extension will have one bit + flipped or else will be changed to a random value. """ + sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] + assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." + if sharevernum == 1: + uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0] + uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0] + else: + uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0] + uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0] + + return corrupt_field(data, uriextoffset, uriextlen) + +class Test(ShareManglingMixin, unittest.TestCase): + def setUp(self): + # Set self.basedir to a temp dir which has the name of the current test method in its + # name. + self.basedir = self.mktemp() + + d = defer.maybeDeferred(SystemTestMixin.setUp, self) + d.addCallback(lambda x: self.set_up_nodes()) + + def _upload_a_file(ignored): + d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence="")) + def _after_upload(u): + self.uri = IURI(u.uri) + return self.clients[0].create_node_from_uri(self.uri) + d2.addCallback(_after_upload) + return d2 + d.addCallback(_upload_a_file) + + def _stash_it(filenode): + self.filenode = filenode + d.addCallback(_stash_it) + return d + + def _download_and_check_plaintext(self, unused=None): + self.downloader = self.clients[1].getServiceNamed("downloader") + d = self.downloader.download_to_data(self.uri) + + def _after_download(result): + self.failUnlessEqual(result, TEST_DATA) + d.addCallback(_after_download) + return d + + def _delete_a_share(self, unused=None, sharenum=None): + """ Delete one share. """ + + shares = self.find_shares() + ks = shares.keys() + if sharenum is not None: + k = [ key for key in shares.keys() if key[1] == sharenum ][0] + else: + k = random.choice(ks) + del shares[k] + self.replace_shares(shares, storage_index=self.uri.storage_index) + + return unused + + def test_test_code(self): + # The following process of stashing the shares, running + # replace_shares, and asserting that the new set of shares equals the + # old is more to test this test code than to test the Tahoe code... + d = defer.succeed(None) + d.addCallback(self.find_shares) + stash = [None] + def _stash_it(res): + stash[0] = res + return res + d.addCallback(_stash_it) + d.addCallback(self.replace_shares, storage_index=self.uri.storage_index) + + def _compare(res): + oldshares = stash[0] + self.failUnless(isinstance(oldshares, dict), oldshares) + self.failUnlessEqual(oldshares, res) + + d.addCallback(self.find_shares) + d.addCallback(_compare) + + d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index)) + d.addCallback(self.find_shares) + d.addCallback(lambda x: self.failUnlessEqual(x, {})) + + # The following process of deleting 8 of the shares and asserting that you can't + # download it is more to test this test code than to test the Tahoe code... + def _then_delete_8(unused=None): + self.replace_shares(stash[0], storage_index=self.uri.storage_index) + for i in range(8): + self._delete_a_share() + d.addCallback(_then_delete_8) + + def _then_download(unused=None): + self.downloader = self.clients[1].getServiceNamed("downloader") + d = self.downloader.download_to_data(self.uri) + + def _after_download_callb(result): + self.fail() # should have gotten an errback instead + return result + def _after_download_errb(failure): + failure.trap(NotEnoughSharesError) + return None # success! + d.addCallbacks(_after_download_callb, _after_download_errb) + d.addCallback(_then_download) + + # The following process of leaving 8 of the shares deleted and asserting that you can't + # repair it is more to test this test code than to test the Tahoe code... + def _then_repair(unused=None): + d2 = self.filenode.check_and_repair(Monitor(), verify=False) + def _after_repair(checkandrepairresults): + prerepairres = checkandrepairresults.get_pre_repair_results() + postrepairres = checkandrepairresults.get_post_repair_results() + self.failIf(prerepairres.is_healthy()) + self.failIf(postrepairres.is_healthy()) + d2.addCallback(_after_repair) + return d2 + d.addCallback(_then_repair) + return d + + def _count_reads(self): + sum_of_read_counts = 0 + for client in self.clients: + counters = client.stats_provider.get_stats()['counters'] + sum_of_read_counts += counters.get('storage_server.read', 0) + return sum_of_read_counts + + def _count_allocates(self): + sum_of_allocate_counts = 0 + for client in self.clients: + counters = client.stats_provider.get_stats()['counters'] + sum_of_allocate_counts += counters.get('storage_server.allocate', 0) + return sum_of_allocate_counts + + def _corrupt_a_random_share(self, unused, corruptor_func): + """ Exactly one share on disk will be corrupted by corruptor_func. """ + shares = self.find_shares() + ks = shares.keys() + k = random.choice(ks) + + shares[k] = corruptor_func(shares[k]) + + self.replace_shares(shares, storage_index=self.uri.storage_index) + + def test_check_without_verify(self): + """ Check says the file is healthy when none of the shares have been touched. It says + that the file is unhealthy when all of them have been removed. It doesn't use any reads. + """ + d = defer.succeed(self.filenode) + def _check1(filenode): + before_check_reads = self._count_reads() + + d2 = filenode.check(Monitor(), verify=False) + def _after_check(checkresults): + after_check_reads = self._count_reads() + self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads) + self.failUnless(checkresults.is_healthy()) + + d2.addCallback(_after_check) + return d2 + d.addCallback(_check1) + + d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index)) + def _check2(ignored): + before_check_reads = self._count_reads() + d2 = self.filenode.check(Monitor(), verify=False) + + def _after_check(checkresults): + after_check_reads = self._count_reads() + self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads) + self.failIf(checkresults.is_healthy()) + + d2.addCallback(_after_check) + return d2 + d.addCallback(_check2) + + return d + + def test_check_with_verify(self): + """ Check says the file is healthy when none of the shares have been touched. It says + that the file is unhealthy if any field of any share has been corrupted. It doesn't use + more than twice as many reads as it needs. """ + LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal. + DELTA_READS = 10 * LEEWAY # N = 10 + d = defer.succeed(self.filenode) + def _check_pristine(filenode): + before_check_reads = self._count_reads() + + d2 = filenode.check(Monitor(), verify=True) + def _after_check(checkresults): + after_check_reads = self._count_reads() + self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads, DELTA_READS)) + self.failUnless(checkresults.is_healthy()) + + d2.addCallback(_after_check) + return d2 + d.addCallback(_check_pristine) + + d.addCallback(self.find_shares) + stash = [None] + def _stash_it(res): + stash[0] = res + return res + d.addCallback(_stash_it) + + def _check_after_feckless_corruption(ignored, corruptor_func): + # Corruption which has no effect -- bits of the share file that are unused. + before_check_reads = self._count_reads() + d2 = self.filenode.check(Monitor(), verify=True) + + def _after_check(checkresults): + after_check_reads = self._count_reads() + self.failIf(after_check_reads - before_check_reads > DELTA_READS) + self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func)) + data = checkresults.get_data() + self.failUnless(data['count-shares-good'] == 10, data) + self.failUnless(len(data['sharemap']) == 10, data) + self.failUnless(data['count-shares-needed'] == 3, data) + self.failUnless(data['count-shares-expected'] == 10, data) + self.failUnless(data['count-good-share-hosts'] == 5, data) + self.failUnless(len(data['servers-responding']) == 5, data) + self.failUnless(len(data['list-corrupt-shares']) == 0, data) + + d2.addCallback(_after_check) + return d2 + + def _put_it_all_back(ignored): + self.replace_shares(stash[0], storage_index=self.uri.storage_index) + return ignored + + for corruptor_func in ( + _corrupt_size_of_file_data, + _corrupt_size_of_sharedata, + _corrupt_segment_size, + ): + d.addCallback(self._corrupt_a_random_share, corruptor_func) + d.addCallback(_check_after_feckless_corruption, corruptor_func=corruptor_func) + d.addCallback(_put_it_all_back) + + def _check_after_server_visible_corruption(ignored, corruptor_func): + # Corruption which is detected by the server means that the server will send you + # back a Failure in response to get_bucket instead of giving you the share data. + before_check_reads = self._count_reads() + d2 = self.filenode.check(Monitor(), verify=True) + + def _after_check(checkresults): + after_check_reads = self._count_reads() + self.failIf(after_check_reads - before_check_reads > DELTA_READS) + self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func)) + data = checkresults.get_data() + # The server might fail to serve up its other share as well as the corrupted + # one, so count-shares-good could be 8 or 9. + self.failUnless(data['count-shares-good'] in (8, 9), data) + self.failUnless(len(data['sharemap']) in (8, 9,), data) + self.failUnless(data['count-shares-needed'] == 3, data) + self.failUnless(data['count-shares-expected'] == 10, data) + # The server may have served up the non-corrupted share, or it may not have, so + # the checker could have detected either 4 or 5 good servers. + self.failUnless(data['count-good-share-hosts'] in (4, 5), data) + self.failUnless(len(data['servers-responding']) in (4, 5), data) + # If the server served up the other share, then the checker should consider it good, else it should + # not. + self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data) + self.failUnless(len(data['list-corrupt-shares']) == 0, data) + + d2.addCallback(_after_check) + return d2 + + for corruptor_func in ( + _corrupt_file_version_number, + ): + d.addCallback(self._corrupt_a_random_share, corruptor_func) + d.addCallback(_check_after_server_visible_corruption, corruptor_func=corruptor_func) + d.addCallback(_put_it_all_back) + + def _check_after_share_incompatibility(ignored, corruptor_func): + # Corruption which means the share is indistinguishable from a share of an + # incompatible version. + before_check_reads = self._count_reads() + d2 = self.filenode.check(Monitor(), verify=True) + + def _after_check(checkresults): + after_check_reads = self._count_reads() + self.failIf(after_check_reads - before_check_reads > DELTA_READS) + self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func)) + data = checkresults.get_data() + self.failUnless(data['count-shares-good'] == 9, data) + self.failUnless(len(data['sharemap']) == 9, data) + self.failUnless(data['count-shares-needed'] == 3, data) + self.failUnless(data['count-shares-expected'] == 10, data) + self.failUnless(data['count-good-share-hosts'] == 5, data) + self.failUnless(len(data['servers-responding']) == 5, data) + self.failUnless(len(data['list-corrupt-shares']) == 0, data) + self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data) + self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data) + self.failUnless(len(data['list-incompatible-shares']) == 1, data) + + d2.addCallback(_after_check) + return d2 + + for corruptor_func in ( + _corrupt_sharedata_version_number, + ): + d.addCallback(self._corrupt_a_random_share, corruptor_func) + d.addCallback(_check_after_share_incompatibility, corruptor_func=corruptor_func) + d.addCallback(_put_it_all_back) + + def _check_after_server_invisible_corruption(ignored, corruptor_func): + # Corruption which is not detected by the server means that the server will send you + # back the share data, but you will detect that it is wrong. + before_check_reads = self._count_reads() + d2 = self.filenode.check(Monitor(), verify=True) + + def _after_check(checkresults): + after_check_reads = self._count_reads() + # print "delta was ", after_check_reads - before_check_reads + self.failIf(after_check_reads - before_check_reads > DELTA_READS) + self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func)) + data = checkresults.get_data() + self.failUnless(data['count-shares-good'] == 9, data) + self.failUnless(data['count-shares-needed'] == 3, data) + self.failUnless(data['count-shares-expected'] == 10, data) + self.failUnless(data['count-good-share-hosts'] == 5, data) + self.failUnless(data['count-corrupt-shares'] == 1, (data, corruptor_func)) + self.failUnless(len(data['list-corrupt-shares']) == 1, data) + self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data) + self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data) + self.failUnless(len(data['list-incompatible-shares']) == 0, data) + self.failUnless(len(data['servers-responding']) == 5, data) + self.failUnless(len(data['sharemap']) == 9, data) + + d2.addCallback(_after_check) + return d2 + + for corruptor_func in ( + _corrupt_sharedata_version_number_to_known_version, + _corrupt_offset_of_sharedata, + _corrupt_offset_of_ciphertext_hash_tree, + _corrupt_offset_of_block_hashes, + _corrupt_offset_of_share_hashes, + _corrupt_offset_of_uri_extension, + _corrupt_share_data, + _corrupt_crypttext_hash_tree, + _corrupt_block_hashes, + _corrupt_share_hashes, + _corrupt_length_of_uri_extension, + _corrupt_uri_extension, + ): + d.addCallback(self._corrupt_a_random_share, corruptor_func) + d.addCallback(_check_after_server_invisible_corruption, corruptor_func=corruptor_func) + d.addCallback(_put_it_all_back) + return d + test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet." + + def test_repair(self): + """ Repair replaces a share that got deleted. """ + # N == 10. 7 is the "efficiency leeway" -- we'll allow you to pass this test even if + # you trigger seven times as many disk reads and blocks sends as would be optimal. + DELTA_READS = 10 * 7 + # We'll allow you to pass this test only if you repair the missing share using only a + # single allocate. + DELTA_ALLOCATES = 1 + + d = defer.succeed(self.filenode) + d.addCallback(self._delete_a_share, sharenum=2) + + def _repair_from_deletion_of_1(filenode): + before_repair_reads = self._count_reads() + before_repair_allocates = self._count_allocates() + + d2 = filenode.check_and_repair(Monitor(), verify=False) + def _after_repair(checkandrepairresults): + prerepairres = checkandrepairresults.get_pre_repair_results() + postrepairres = checkandrepairresults.get_post_repair_results() + after_repair_reads = self._count_reads() + after_repair_allocates = self._count_allocates() + + # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates + self.failIf(after_repair_reads - before_repair_reads > DELTA_READS) + self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES) + self.failIf(prerepairres.is_healthy()) + self.failUnless(postrepairres.is_healthy()) + + # Now we inspect the filesystem to make sure that it has 10 shares. + shares = self.find_shares() + self.failIf(len(shares) < 10) + + # Now we delete seven of the other shares, then try to download the file and + # assert that it succeeds at downloading and has the right contents. This can't + # work unless it has already repaired the previously-deleted share #2. + for sharenum in range(3, 10): + self._delete_a_share(sharenum=sharenum) + + return self._download_and_check_plaintext() + + d2.addCallback(_after_repair) + return d2 + d.addCallback(_repair_from_deletion_of_1) + + # Now we repair again to get all of those 7 back... + def _repair_from_deletion_of_7(filenode): + before_repair_reads = self._count_reads() + before_repair_allocates = self._count_allocates() + + d2 = filenode.check_and_repair(Monitor(), verify=False) + def _after_repair(checkandrepairresults): + prerepairres = checkandrepairresults.get_pre_repair_results() + postrepairres = checkandrepairresults.get_post_repair_results() + after_repair_reads = self._count_reads() + after_repair_allocates = self._count_allocates() + + # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates + self.failIf(after_repair_reads - before_repair_reads > DELTA_READS) + self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7)) + self.failIf(prerepairres.is_healthy()) + self.failUnless(postrepairres.is_healthy()) + + # Now we inspect the filesystem to make sure that it has 10 shares. + shares = self.find_shares() + self.failIf(len(shares) < 10) + + return self._download_and_check_plaintext() + + d2.addCallback(_after_repair) + return d2 + d.addCallback(_repair_from_deletion_of_7) + + def _repair_from_corruption(filenode): + before_repair_reads = self._count_reads() + before_repair_allocates = self._count_allocates() + + d2 = filenode.check_and_repair(Monitor(), verify=False) + def _after_repair(checkandrepairresults): + prerepairres = checkandrepairresults.get_pre_repair_results() + postrepairres = checkandrepairresults.get_post_repair_results() + after_repair_reads = self._count_reads() + after_repair_allocates = self._count_allocates() + + # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates + self.failIf(after_repair_reads - before_repair_reads > DELTA_READS) + self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES) + self.failIf(prerepairres.is_healthy()) + self.failUnless(postrepairres.is_healthy()) + + return self._download_and_check_plaintext() + + d2.addCallback(_after_repair) + return d2 + + for corruptor_func in ( + _corrupt_file_version_number, + _corrupt_sharedata_version_number, + _corrupt_sharedata_version_number_to_known_version, + _corrupt_offset_of_sharedata, + _corrupt_offset_of_ciphertext_hash_tree, + _corrupt_offset_of_block_hashes, + _corrupt_offset_of_share_hashes, + _corrupt_offset_of_uri_extension, + _corrupt_share_data, + _corrupt_crypttext_hash_tree, + _corrupt_block_hashes, + _corrupt_share_hashes, + _corrupt_length_of_uri_extension, + _corrupt_uri_extension, + ): + # Now we corrupt a share... + d.addCallback(self._corrupt_a_random_share, corruptor_func) + # And repair... + d.addCallback(_repair_from_corruption) + + return d + test_repair.todo = "We haven't implemented a repairer yet." + + +# XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run + +# XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example + +# XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit diff --git a/src/allmydata/test/test_immutable_checker.py b/src/allmydata/test/test_immutable_checker.py deleted file mode 100644 index a9b23616..00000000 --- a/src/allmydata/test/test_immutable_checker.py +++ /dev/null @@ -1,680 +0,0 @@ - -from allmydata.test.common import SystemTestMixin, ShareManglingMixin -from allmydata.monitor import Monitor -from allmydata.interfaces import IURI, NotEnoughSharesError -from allmydata.immutable import upload -from allmydata.util import log -from twisted.internet import defer -from twisted.trial import unittest -import random, struct -import common_util as testutil - -TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1) - -def corrupt_field(data, offset, size, debug=False): - if random.random() < 0.5: - newdata = testutil.flip_one_bit(data, offset, size) - if debug: - log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size])) - return newdata - else: - newval = testutil.insecurerandstr(size) - if debug: - log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval)) - return data[:offset]+newval+data[offset+size:] - -def _corrupt_file_version_number(data): - """ Scramble the file data -- the share file version number have one bit flipped or else - will be changed to a random value.""" - return corrupt_field(data, 0x00, 4) - -def _corrupt_size_of_file_data(data): - """ Scramble the file data -- the field showing the size of the share data within the file - will be set to one smaller. """ - return corrupt_field(data, 0x04, 4) - -def _corrupt_sharedata_version_number(data): - """ Scramble the file data -- the share data version number will have one bit flipped or - else will be changed to a random value, but not 1 or 2.""" - return corrupt_field(data, 0x0c, 4) - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - newsharevernum = sharevernum - while newsharevernum in (1, 2): - newsharevernum = random.randrange(0, 2**32) - newsharevernumbytes = struct.pack(">l", newsharevernum) - return data[:0x0c] + newsharevernumbytes + data[0x0c+4:] - -def _corrupt_sharedata_version_number_to_known_version(data): - """ Scramble the file data -- the share data version number will - be changed to 2 if it is 1 or else to 1 if it is 2.""" - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - newsharevernum = 2 - else: - newsharevernum = 1 - newsharevernumbytes = struct.pack(">l", newsharevernum) - return data[:0x0c] + newsharevernumbytes + data[0x0c+4:] - -def _corrupt_segment_size(data): - """ Scramble the file data -- the field showing the size of the segment will have one - bit flipped or else be changed to a random value. """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - return corrupt_field(data, 0x0c+0x04, 4, debug=True) - else: - return corrupt_field(data, 0x0c+0x04, 8, debug=True) - -def _corrupt_size_of_sharedata(data): - """ Scramble the file data -- the field showing the size of the data within the share - data will have one bit flipped or else will be changed to a random value. """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - return corrupt_field(data, 0x0c+0x08, 4) - else: - return corrupt_field(data, 0x0c+0x0c, 8) - -def _corrupt_offset_of_sharedata(data): - """ Scramble the file data -- the field showing the offset of the data within the share - data will have one bit flipped or else be changed to a random value. """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - return corrupt_field(data, 0x0c+0x0c, 4) - else: - return corrupt_field(data, 0x0c+0x14, 8) - -def _corrupt_offset_of_ciphertext_hash_tree(data): - """ Scramble the file data -- the field showing the offset of the ciphertext hash tree - within the share data will have one bit flipped or else be changed to a random value. - """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - return corrupt_field(data, 0x0c+0x14, 4, debug=True) - else: - return corrupt_field(data, 0x0c+0x24, 8, debug=True) - -def _corrupt_offset_of_block_hashes(data): - """ Scramble the file data -- the field showing the offset of the block hash tree within - the share data will have one bit flipped or else will be changed to a random value. """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - return corrupt_field(data, 0x0c+0x18, 4) - else: - return corrupt_field(data, 0x0c+0x2c, 8) - -def _corrupt_offset_of_share_hashes(data): - """ Scramble the file data -- the field showing the offset of the share hash tree within - the share data will have one bit flipped or else will be changed to a random value. """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - return corrupt_field(data, 0x0c+0x1c, 4) - else: - return corrupt_field(data, 0x0c+0x34, 8) - -def _corrupt_offset_of_uri_extension(data): - """ Scramble the file data -- the field showing the offset of the uri extension will - have one bit flipped or else will be changed to a random value. """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - return corrupt_field(data, 0x0c+0x20, 4) - else: - return corrupt_field(data, 0x0c+0x3c, 8) - -def _corrupt_share_data(data): - """ Scramble the file data -- the field containing the share data itself will have one - bit flipped or else will be changed to a random value. """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0] - - return corrupt_field(data, 0x0c+0x24, sharedatasize) - else: - sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0] - - return corrupt_field(data, 0x0c+0x44, sharedatasize) - -def _corrupt_crypttext_hash_tree(data): - """ Scramble the file data -- the field containing the crypttext hash tree will have one - bit flipped or else will be changed to a random value. - """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0] - blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0] - else: - crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0] - blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0] - - return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset) - -def _corrupt_block_hashes(data): - """ Scramble the file data -- the field containing the block hash tree will have one bit - flipped or else will be changed to a random value. - """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0] - sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0] - else: - blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0] - sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0] - - return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset) - -def _corrupt_share_hashes(data): - """ Scramble the file data -- the field containing the share hash chain will have one - bit flipped or else will be changed to a random value. - """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0] - uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0] - else: - sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0] - uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0] - - return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset) - -def _corrupt_length_of_uri_extension(data): - """ Scramble the file data -- the field showing the length of the uri extension will - have one bit flipped or else will be changed to a random value. """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0] - return corrupt_field(data, uriextoffset, 4) - else: - uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0] - return corrupt_field(data, uriextoffset, 8) - -def _corrupt_uri_extension(data): - """ Scramble the file data -- the field containing the uri extension will have one bit - flipped or else will be changed to a random value. """ - sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0] - assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways." - if sharevernum == 1: - uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0] - uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0] - else: - uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0] - uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0] - - return corrupt_field(data, uriextoffset, uriextlen) - -class Test(ShareManglingMixin, unittest.TestCase): - def setUp(self): - # Set self.basedir to a temp dir which has the name of the current test method in its - # name. - self.basedir = self.mktemp() - - d = defer.maybeDeferred(SystemTestMixin.setUp, self) - d.addCallback(lambda x: self.set_up_nodes()) - - def _upload_a_file(ignored): - d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence="")) - def _after_upload(u): - self.uri = IURI(u.uri) - return self.clients[0].create_node_from_uri(self.uri) - d2.addCallback(_after_upload) - return d2 - d.addCallback(_upload_a_file) - - def _stash_it(filenode): - self.filenode = filenode - d.addCallback(_stash_it) - return d - - def _download_and_check_plaintext(self, unused=None): - self.downloader = self.clients[1].getServiceNamed("downloader") - d = self.downloader.download_to_data(self.uri) - - def _after_download(result): - self.failUnlessEqual(result, TEST_DATA) - d.addCallback(_after_download) - return d - - def _delete_a_share(self, unused=None, sharenum=None): - """ Delete one share. """ - - shares = self.find_shares() - ks = shares.keys() - if sharenum is not None: - k = [ key for key in shares.keys() if key[1] == sharenum ][0] - else: - k = random.choice(ks) - del shares[k] - self.replace_shares(shares, storage_index=self.uri.storage_index) - - return unused - - def test_test_code(self): - # The following process of stashing the shares, running - # replace_shares, and asserting that the new set of shares equals the - # old is more to test this test code than to test the Tahoe code... - d = defer.succeed(None) - d.addCallback(self.find_shares) - stash = [None] - def _stash_it(res): - stash[0] = res - return res - d.addCallback(_stash_it) - d.addCallback(self.replace_shares, storage_index=self.uri.storage_index) - - def _compare(res): - oldshares = stash[0] - self.failUnless(isinstance(oldshares, dict), oldshares) - self.failUnlessEqual(oldshares, res) - - d.addCallback(self.find_shares) - d.addCallback(_compare) - - d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index)) - d.addCallback(self.find_shares) - d.addCallback(lambda x: self.failUnlessEqual(x, {})) - - # The following process of deleting 8 of the shares and asserting that you can't - # download it is more to test this test code than to test the Tahoe code... - def _then_delete_8(unused=None): - self.replace_shares(stash[0], storage_index=self.uri.storage_index) - for sharenum in range(2, 10): - self._delete_a_share() - d.addCallback(_then_delete_8) - - def _then_download(unused=None): - self.downloader = self.clients[1].getServiceNamed("downloader") - d = self.downloader.download_to_data(self.uri) - - def _after_download_callb(result): - self.fail() # should have gotten an errback instead - return result - def _after_download_errb(failure): - failure.trap(NotEnoughSharesError) - return None # success! - d.addCallbacks(_after_download_callb, _after_download_errb) - d.addCallback(_then_download) - - # The following process of leaving 8 of the shares deleted and asserting that you can't - # repair it is more to test this test code than to test the Tahoe code... - def _then_repair(unused=None): - d2 = self.filenode.check_and_repair(Monitor(), verify=False) - def _after_repair(checkandrepairresults): - prerepairres = checkandrepairresults.get_pre_repair_results() - postrepairres = checkandrepairresults.get_post_repair_results() - self.failIf(prerepairres.is_healthy()) - self.failIf(postrepairres.is_healthy()) - d2.addCallback(_after_repair) - return d2 - d.addCallback(_then_repair) - return d - - def _count_reads(self): - sum_of_read_counts = 0 - for client in self.clients: - counters = client.stats_provider.get_stats()['counters'] - sum_of_read_counts += counters.get('storage_server.read', 0) - return sum_of_read_counts - - def _count_allocates(self): - sum_of_allocate_counts = 0 - for client in self.clients: - counters = client.stats_provider.get_stats()['counters'] - sum_of_allocate_counts += counters.get('storage_server.allocate', 0) - return sum_of_allocate_counts - - def _corrupt_a_random_share(self, unused, corruptor_func): - """ Exactly one share on disk will be corrupted by corruptor_func. """ - shares = self.find_shares() - ks = shares.keys() - k = random.choice(ks) - - shares[k] = corruptor_func(shares[k]) - - self.replace_shares(shares, storage_index=self.uri.storage_index) - - def test_check_without_verify(self): - """ Check says the file is healthy when none of the shares have been touched. It says - that the file is unhealthy when all of them have been removed. It doesn't use any reads. - """ - d = defer.succeed(self.filenode) - def _check1(filenode): - before_check_reads = self._count_reads() - - d2 = filenode.check(Monitor(), verify=False) - def _after_check(checkresults): - after_check_reads = self._count_reads() - self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads) - self.failUnless(checkresults.is_healthy()) - - d2.addCallback(_after_check) - return d2 - d.addCallback(_check1) - - d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index)) - def _check2(ignored): - before_check_reads = self._count_reads() - d2 = self.filenode.check(Monitor(), verify=False) - - def _after_check(checkresults): - after_check_reads = self._count_reads() - self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads) - self.failIf(checkresults.is_healthy()) - - d2.addCallback(_after_check) - return d2 - d.addCallback(_check2) - - return d - - def test_check_with_verify(self): - """ Check says the file is healthy when none of the shares have been touched. It says - that the file is unhealthy if any field of any share has been corrupted. It doesn't use - more than twice as many reads as it needs. """ - LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal. - DELTA_READS = 10 * LEEWAY # N = 10 - d = defer.succeed(self.filenode) - def _check_pristine(filenode): - before_check_reads = self._count_reads() - - d2 = filenode.check(Monitor(), verify=True) - def _after_check(checkresults): - after_check_reads = self._count_reads() - self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads, DELTA_READS)) - self.failUnless(checkresults.is_healthy()) - - d2.addCallback(_after_check) - return d2 - d.addCallback(_check_pristine) - - d.addCallback(self.find_shares) - stash = [None] - def _stash_it(res): - stash[0] = res - return res - d.addCallback(_stash_it) - - def _check_after_feckless_corruption(ignored, corruptor_func): - # Corruption which has no effect -- bits of the share file that are unused. - before_check_reads = self._count_reads() - d2 = self.filenode.check(Monitor(), verify=True) - - def _after_check(checkresults): - after_check_reads = self._count_reads() - self.failIf(after_check_reads - before_check_reads > DELTA_READS) - self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func)) - data = checkresults.get_data() - self.failUnless(data['count-shares-good'] == 10, data) - self.failUnless(len(data['sharemap']) == 10, data) - self.failUnless(data['count-shares-needed'] == 3, data) - self.failUnless(data['count-shares-expected'] == 10, data) - self.failUnless(data['count-good-share-hosts'] == 5, data) - self.failUnless(len(data['servers-responding']) == 5, data) - self.failUnless(len(data['list-corrupt-shares']) == 0, data) - - d2.addCallback(_after_check) - return d2 - - def _put_it_all_back(ignored): - self.replace_shares(stash[0], storage_index=self.uri.storage_index) - return ignored - - for corruptor_func in ( - _corrupt_size_of_file_data, - _corrupt_size_of_sharedata, - _corrupt_segment_size, - ): - d.addCallback(self._corrupt_a_random_share, corruptor_func) - d.addCallback(_check_after_feckless_corruption, corruptor_func=corruptor_func) - d.addCallback(_put_it_all_back) - - def _check_after_server_visible_corruption(ignored, corruptor_func): - # Corruption which is detected by the server means that the server will send you - # back a Failure in response to get_bucket instead of giving you the share data. - before_check_reads = self._count_reads() - d2 = self.filenode.check(Monitor(), verify=True) - - def _after_check(checkresults): - after_check_reads = self._count_reads() - self.failIf(after_check_reads - before_check_reads > DELTA_READS) - self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func)) - data = checkresults.get_data() - # The server might fail to serve up its other share as well as the corrupted - # one, so count-shares-good could be 8 or 9. - self.failUnless(data['count-shares-good'] in (8, 9), data) - self.failUnless(len(data['sharemap']) in (8, 9,), data) - self.failUnless(data['count-shares-needed'] == 3, data) - self.failUnless(data['count-shares-expected'] == 10, data) - # The server may have served up the non-corrupted share, or it may not have, so - # the checker could have detected either 4 or 5 good servers. - self.failUnless(data['count-good-share-hosts'] in (4, 5), data) - self.failUnless(len(data['servers-responding']) in (4, 5), data) - # If the server served up the other share, then the checker should consider it good, else it should - # not. - self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data) - self.failUnless(len(data['list-corrupt-shares']) == 0, data) - - d2.addCallback(_after_check) - return d2 - - for corruptor_func in ( - _corrupt_file_version_number, - ): - d.addCallback(self._corrupt_a_random_share, corruptor_func) - d.addCallback(_check_after_server_visible_corruption, corruptor_func=corruptor_func) - d.addCallback(_put_it_all_back) - - def _check_after_share_incompatibility(ignored, corruptor_func): - # Corruption which means the share is indistinguishable from a share of an - # incompatible version. - before_check_reads = self._count_reads() - d2 = self.filenode.check(Monitor(), verify=True) - - def _after_check(checkresults): - after_check_reads = self._count_reads() - self.failIf(after_check_reads - before_check_reads > DELTA_READS) - self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func)) - data = checkresults.get_data() - self.failUnless(data['count-shares-good'] == 9, data) - self.failUnless(len(data['sharemap']) == 9, data) - self.failUnless(data['count-shares-needed'] == 3, data) - self.failUnless(data['count-shares-expected'] == 10, data) - self.failUnless(data['count-good-share-hosts'] == 5, data) - self.failUnless(len(data['servers-responding']) == 5, data) - self.failUnless(len(data['list-corrupt-shares']) == 0, data) - self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data) - self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data) - self.failUnless(len(data['list-incompatible-shares']) == 1, data) - - d2.addCallback(_after_check) - return d2 - - for corruptor_func in ( - _corrupt_sharedata_version_number, - ): - d.addCallback(self._corrupt_a_random_share, corruptor_func) - d.addCallback(_check_after_share_incompatibility, corruptor_func=corruptor_func) - d.addCallback(_put_it_all_back) - - def _check_after_server_invisible_corruption(ignored, corruptor_func): - # Corruption which is not detected by the server means that the server will send you - # back the share data, but you will detect that it is wrong. - before_check_reads = self._count_reads() - d2 = self.filenode.check(Monitor(), verify=True) - - def _after_check(checkresults): - after_check_reads = self._count_reads() - # print "delta was ", after_check_reads - before_check_reads - self.failIf(after_check_reads - before_check_reads > DELTA_READS) - self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func)) - data = checkresults.get_data() - self.failUnless(data['count-shares-good'] == 9, data) - self.failUnless(data['count-shares-needed'] == 3, data) - self.failUnless(data['count-shares-expected'] == 10, data) - self.failUnless(data['count-good-share-hosts'] == 5, data) - self.failUnless(data['count-corrupt-shares'] == 1, (data, corruptor_func)) - self.failUnless(len(data['list-corrupt-shares']) == 1, data) - self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data) - self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data) - self.failUnless(len(data['list-incompatible-shares']) == 0, data) - self.failUnless(len(data['servers-responding']) == 5, data) - self.failUnless(len(data['sharemap']) == 9, data) - - d2.addCallback(_after_check) - return d2 - - for corruptor_func in ( - _corrupt_sharedata_version_number_to_known_version, - _corrupt_offset_of_sharedata, - _corrupt_offset_of_ciphertext_hash_tree, - _corrupt_offset_of_block_hashes, - _corrupt_offset_of_share_hashes, - _corrupt_offset_of_uri_extension, - _corrupt_share_data, - _corrupt_crypttext_hash_tree, - _corrupt_block_hashes, - _corrupt_share_hashes, - _corrupt_length_of_uri_extension, - _corrupt_uri_extension, - ): - d.addCallback(self._corrupt_a_random_share, corruptor_func) - d.addCallback(_check_after_server_invisible_corruption, corruptor_func=corruptor_func) - d.addCallback(_put_it_all_back) - return d - test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet." - - def test_repair(self): - """ Repair replaces a share that got deleted. """ - # N == 10. 7 is the "efficiency leeway" -- we'll allow you to pass this test even if - # you trigger seven times as many disk reads and blocks sends as would be optimal. - DELTA_READS = 10 * 7 - # We'll allow you to pass this test only if you repair the missing share using only a - # single allocate. - DELTA_ALLOCATES = 1 - - d = defer.succeed(self.filenode) - d.addCallback(self._delete_a_share, sharenum=2) - - def _repair_from_deletion_of_1(filenode): - before_repair_reads = self._count_reads() - before_repair_allocates = self._count_allocates() - - d2 = filenode.check_and_repair(Monitor(), verify=False) - def _after_repair(checkandrepairresults): - prerepairres = checkandrepairresults.get_pre_repair_results() - postrepairres = checkandrepairresults.get_post_repair_results() - after_repair_reads = self._count_reads() - after_repair_allocates = self._count_allocates() - - # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates - self.failIf(after_repair_reads - before_repair_reads > DELTA_READS) - self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES) - self.failIf(prerepairres.is_healthy()) - self.failUnless(postrepairres.is_healthy()) - - # Now we inspect the filesystem to make sure that it has 10 shares. - shares = self.find_shares() - self.failIf(len(shares) < 10) - - # Now we delete seven of the other shares, then try to download the file and - # assert that it succeeds at downloading and has the right contents. This can't - # work unless it has already repaired the previously-deleted share #2. - for sharenum in range(3, 10): - self._delete_a_share(sharenum=sharenum) - - return self._download_and_check_plaintext() - - d2.addCallback(_after_repair) - return d2 - d.addCallback(_repair_from_deletion_of_1) - - # Now we repair again to get all of those 7 back... - def _repair_from_deletion_of_7(filenode): - before_repair_reads = self._count_reads() - before_repair_allocates = self._count_allocates() - - d2 = filenode.check_and_repair(Monitor(), verify=False) - def _after_repair(checkandrepairresults): - prerepairres = checkandrepairresults.get_pre_repair_results() - postrepairres = checkandrepairresults.get_post_repair_results() - after_repair_reads = self._count_reads() - after_repair_allocates = self._count_allocates() - - # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates - self.failIf(after_repair_reads - before_repair_reads > DELTA_READS) - self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7)) - self.failIf(prerepairres.is_healthy()) - self.failUnless(postrepairres.is_healthy()) - - # Now we inspect the filesystem to make sure that it has 10 shares. - shares = self.find_shares() - self.failIf(len(shares) < 10) - - return self._download_and_check_plaintext() - - d2.addCallback(_after_repair) - return d2 - d.addCallback(_repair_from_deletion_of_7) - - def _repair_from_corruption(filenode): - before_repair_reads = self._count_reads() - before_repair_allocates = self._count_allocates() - - d2 = filenode.check_and_repair(Monitor(), verify=False) - def _after_repair(checkandrepairresults): - prerepairres = checkandrepairresults.get_pre_repair_results() - postrepairres = checkandrepairresults.get_post_repair_results() - after_repair_reads = self._count_reads() - after_repair_allocates = self._count_allocates() - - # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates - self.failIf(after_repair_reads - before_repair_reads > DELTA_READS) - self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES) - self.failIf(prerepairres.is_healthy()) - self.failUnless(postrepairres.is_healthy()) - - return self._download_and_check_plaintext() - - d2.addCallback(_after_repair) - return d2 - - for corruptor_func in ( - _corrupt_file_version_number, - _corrupt_sharedata_version_number, - _corrupt_sharedata_version_number_to_known_version, - _corrupt_offset_of_sharedata, - _corrupt_offset_of_ciphertext_hash_tree, - _corrupt_offset_of_block_hashes, - _corrupt_offset_of_share_hashes, - _corrupt_offset_of_uri_extension, - _corrupt_share_data, - _corrupt_crypttext_hash_tree, - _corrupt_block_hashes, - _corrupt_share_hashes, - _corrupt_length_of_uri_extension, - _corrupt_uri_extension, - ): - # Now we corrupt a share... - d.addCallback(self._corrupt_a_random_share, corruptor_func) - # And repair... - d.addCallback(_repair_from_corruption) - - return d - test_repair.todo = "We haven't implemented a repairer yet." - - -# XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run - -# XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example - -# XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit