-from allmydata.test.common import SystemTestMixin, ShareManglingMixin
-from allmydata.monitor import Monitor
-from allmydata import check_results
-from allmydata.interfaces import IURI, NotEnoughSharesError
-from allmydata.immutable import upload
-from allmydata.util import log
-from twisted.internet import defer
+
+import random
+
from twisted.trial import unittest
-import random, struct
-import common_util as testutil
-
-TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
-
-def corrupt_field(data, offset, size, debug=False):
- if random.random() < 0.5:
- newdata = testutil.flip_one_bit(data, offset, size)
- if debug:
- log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size]))
- return newdata
- else:
- newval = testutil.insecurerandstr(size)
- if debug:
- log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval))
- return data[:offset]+newval+data[offset+size:]
-
-def _corrupt_nothing(data):
- """ Leave the data pristine. """
- return data
-
-def _corrupt_file_version_number(data):
- """ Scramble the file data -- the share file version number have one bit flipped or else
- will be changed to a random value."""
- return corrupt_field(data, 0x00, 4)
-
-def _corrupt_size_of_file_data(data):
- """ Scramble the file data -- the field showing the size of the share data within the file
- will be set to one smaller. """
- return corrupt_field(data, 0x04, 4)
-
-def _corrupt_sharedata_version_number(data):
- """ Scramble the file data -- the share data version number will have one bit flipped or
- else will be changed to a random value, but not 1 or 2."""
- return corrupt_field(data, 0x0c, 4)
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- newsharevernum = sharevernum
- while newsharevernum in (1, 2):
- newsharevernum = random.randrange(0, 2**32)
- newsharevernumbytes = struct.pack(">l", newsharevernum)
- return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
-
-def _corrupt_sharedata_version_number_to_plausible_version(data):
- """ Scramble the file data -- the share data version number will
- be changed to 2 if it is 1 or else to 1 if it is 2."""
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- newsharevernum = 2
- else:
- newsharevernum = 1
- newsharevernumbytes = struct.pack(">l", newsharevernum)
- return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
-
-def _corrupt_segment_size(data):
- """ Scramble the file data -- the field showing the size of the segment will have one
- bit flipped or else be changed to a random value. """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- return corrupt_field(data, 0x0c+0x04, 4, debug=False)
- else:
- return corrupt_field(data, 0x0c+0x04, 8, debug=False)
-
-def _corrupt_size_of_sharedata(data):
- """ Scramble the file data -- the field showing the size of the data within the share
- data will have one bit flipped or else will be changed to a random value. """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- return corrupt_field(data, 0x0c+0x08, 4)
- else:
- return corrupt_field(data, 0x0c+0x0c, 8)
-
-def _corrupt_offset_of_sharedata(data):
- """ Scramble the file data -- the field showing the offset of the data within the share
- data will have one bit flipped or else be changed to a random value. """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- return corrupt_field(data, 0x0c+0x0c, 4)
- else:
- return corrupt_field(data, 0x0c+0x14, 8)
-
-def _corrupt_offset_of_ciphertext_hash_tree(data):
- """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
- within the share data will have one bit flipped or else be changed to a random value.
- """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- return corrupt_field(data, 0x0c+0x14, 4, debug=False)
- else:
- return corrupt_field(data, 0x0c+0x24, 8, debug=False)
-
-def _corrupt_offset_of_block_hashes(data):
- """ Scramble the file data -- the field showing the offset of the block hash tree within
- the share data will have one bit flipped or else will be changed to a random value. """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- return corrupt_field(data, 0x0c+0x18, 4)
- else:
- return corrupt_field(data, 0x0c+0x2c, 8)
-
-def _corrupt_offset_of_share_hashes(data):
- """ Scramble the file data -- the field showing the offset of the share hash tree within
- the share data will have one bit flipped or else will be changed to a random value. """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- return corrupt_field(data, 0x0c+0x1c, 4)
- else:
- return corrupt_field(data, 0x0c+0x34, 8)
-
-def _corrupt_offset_of_uri_extension(data):
- """ Scramble the file data -- the field showing the offset of the uri extension will
- have one bit flipped or else will be changed to a random value. """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- return corrupt_field(data, 0x0c+0x20, 4)
- else:
- return corrupt_field(data, 0x0c+0x3c, 8)
-
-def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
- """ Scramble the file data -- the field showing the offset of the uri extension will be set
- to the size of the file minus 3. This means when the client tries to read the length field
- from that location it will get a short read -- the result string will be only 3 bytes long,
- not the 4 or 8 bytes necessary to do a successful struct.unpack."""
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- # The "-0x0c" in here is to skip the server-side header in the share file, which the client doesn't see when seeking and reading.
- if sharevernum == 1:
- if debug:
- log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x2c, 4, struct.unpack(">L", data[0x2c:0x2c+4])[0], len(data)-0x0c-3, len(data)))
- return data[:0x2c] + struct.pack(">L", len(data)-0x0c-3) + data[0x2c+4:]
- else:
- if debug:
- log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x48, 8, struct.unpack(">Q", data[0x48:0x48+8])[0], len(data)-0x0c-3, len(data)))
- return data[:0x48] + struct.pack(">Q", len(data)-0x0c-3) + data[0x48+8:]
-
-def _corrupt_share_data(data):
- """ Scramble the file data -- the field containing the share data itself will have one
- bit flipped or else will be changed to a random value. """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
-
- return corrupt_field(data, 0x0c+0x24, sharedatasize)
- else:
- sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
-
- return corrupt_field(data, 0x0c+0x44, sharedatasize)
-
-def _corrupt_crypttext_hash_tree(data):
- """ Scramble the file data -- the field containing the crypttext hash tree will have one
- bit flipped or else will be changed to a random value.
- """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
- blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
- else:
- crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
- blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
-
- return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
-
-def _corrupt_block_hashes(data):
- """ Scramble the file data -- the field containing the block hash tree will have one bit
- flipped or else will be changed to a random value.
- """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
- sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
- else:
- blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
- sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
-
- return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
-
-def _corrupt_share_hashes(data):
- """ Scramble the file data -- the field containing the share hash chain will have one
- bit flipped or else will be changed to a random value.
- """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
- uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
- else:
- sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
- uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
-
- return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
-
-def _corrupt_length_of_uri_extension(data):
- """ Scramble the file data -- the field showing the length of the uri extension will
- have one bit flipped or else will be changed to a random value. """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
- return corrupt_field(data, uriextoffset, 4)
- else:
- uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
- return corrupt_field(data, uriextoffset, 8)
-
-def _corrupt_uri_extension(data):
- """ Scramble the file data -- the field containing the uri extension will have one bit
- flipped or else will be changed to a random value. """
- sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
- assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
- if sharevernum == 1:
- uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
- uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
- else:
- uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
- uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
-
- return corrupt_field(data, uriextoffset, uriextlen)
-
-class Test(ShareManglingMixin, unittest.TestCase):
- def setUp(self):
- # Set self.basedir to a temp dir which has the name of the current test method in its
- # name.
- self.basedir = self.mktemp()
-
- d = defer.maybeDeferred(SystemTestMixin.setUp, self)
- d.addCallback(lambda x: self.set_up_nodes())
-
- def _upload_a_file(ignored):
- d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
- def _after_upload(u):
- self.uri = IURI(u.uri)
- return self.clients[0].create_node_from_uri(self.uri)
- d2.addCallback(_after_upload)
- return d2
- d.addCallback(_upload_a_file)
+from twisted.internet import defer
+from foolscap.api import eventually
- def _stash_it(filenode):
- self.filenode = filenode
- d.addCallback(_stash_it)
+from allmydata.test import common
+from allmydata.test.no_network import GridTestMixin
+from allmydata.test.common import TEST_DATA
+from allmydata import uri
+from allmydata.util import log
+from allmydata.util.consumer import download_to_data
+
+from allmydata.interfaces import NotEnoughSharesError
+from allmydata.immutable.upload import Data
+from allmydata.immutable.downloader import finder
+
+
+class MockShareHashTree(object):
+ def needed_hashes(self):
+ return False
+
+class MockNode(object):
+ def __init__(self, check_reneging, check_fetch_failed):
+ self.got = 0
+ self.finished_d = defer.Deferred()
+ self.segment_size = 78
+ self.guessed_segment_size = 78
+ self._no_more_shares = False
+ self.check_reneging = check_reneging
+ self.check_fetch_failed = check_fetch_failed
+ self._si_prefix='aa'
+ self.have_UEB = True
+ self.share_hash_tree = MockShareHashTree()
+ self.on_want_more_shares = None
+
+ def when_finished(self):
+ return self.finished_d
+ def get_num_segments(self):
+ return (5, True)
+ def _calculate_sizes(self, guessed_segment_size):
+ return {'block_size': 4, 'num_segments': 5}
+ def no_more_shares(self):
+ self._no_more_shares = True
+ def got_shares(self, shares):
+ if self.check_reneging:
+ if self._no_more_shares:
+ self.finished_d.errback(unittest.FailTest("The node was told by the share finder that it is destined to remain hungry, then was given another share."))
+ return
+ self.got += len(shares)
+ log.msg("yyy 3 %s.got_shares(%s) got: %s" % (self, shares, self.got))
+ if self.got == 3:
+ self.finished_d.callback(True)
+ def get_desired_ciphertext_hashes(self, *args, **kwargs):
+ return iter([])
+ def fetch_failed(self, *args, **kwargs):
+ if self.check_fetch_failed:
+ if self.finished_d:
+ self.finished_d.errback(unittest.FailTest("The node was told by the segment fetcher that the download failed."))
+ self.finished_d = None
+ def want_more_shares(self):
+ if self.on_want_more_shares:
+ self.on_want_more_shares()
+ def process_blocks(self, *args, **kwargs):
+ if self.finished_d:
+ self.finished_d.callback(None)
+
+class TestShareFinder(unittest.TestCase):
+ def test_no_reneging_on_no_more_shares_ever(self):
+ # ticket #1191
+
+ # Suppose that K=3 and you send two DYHB requests, the first
+ # response offers two shares, and then the last offers one
+ # share. If you tell your share consumer "no more shares,
+ # ever", and then immediately tell them "oh, and here's
+ # another share", then you lose.
+
+ rcap = uri.CHKFileURI('a'*32, 'a'*32, 3, 99, 100)
+ vcap = rcap.get_verify_cap()
+
+ class MockBuckets(object):
+ pass
+
+ class MockServer(object):
+ def __init__(self, buckets):
+ self.version = {
+ 'http://allmydata.org/tahoe/protocols/storage/v1': {
+ "tolerates-immutable-read-overrun": True
+ }
+ }
+ self.buckets = buckets
+ self.d = defer.Deferred()
+ self.s = None
+ def callRemote(self, methname, *args, **kwargs):
+ d = defer.Deferred()
+
+ # Even after the 3rd answer we're still hungry because
+ # we're interested in finding a share on a 3rd server
+ # so we don't have to download more than one share
+ # from the first server. This is actually necessary to
+ # trigger the bug.
+ def _give_buckets_and_hunger_again():
+ d.callback(self.buckets)
+ self.s.hungry()
+ eventually(_give_buckets_and_hunger_again)
+ return d
+
+ class MockIServer(object):
+ def __init__(self, serverid, rref):
+ self.serverid = serverid
+ self.rref = rref
+ def get_serverid(self):
+ return self.serverid
+ def get_rref(self):
+ return self.rref
+ def get_name(self):
+ return "name-%s" % self.serverid
+ def get_version(self):
+ return self.rref.version
+
+ class MockStorageBroker(object):
+ def __init__(self, servers):
+ self.servers = servers
+ def get_servers_for_psi(self, si):
+ return self.servers
+
+ class MockDownloadStatus(object):
+ def add_dyhb_request(self, server, when):
+ return MockDYHBEvent()
+
+ class MockDYHBEvent(object):
+ def finished(self, shnums, when):
+ pass
+
+ mockserver1 = MockServer({1: MockBuckets(), 2: MockBuckets()})
+ mockserver2 = MockServer({})
+ mockserver3 = MockServer({3: MockBuckets()})
+ servers = [ MockIServer("ms1", mockserver1),
+ MockIServer("ms2", mockserver2),
+ MockIServer("ms3", mockserver3), ]
+ mockstoragebroker = MockStorageBroker(servers)
+ mockdownloadstatus = MockDownloadStatus()
+ mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
+
+ s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus)
+
+ mockserver1.s = s
+ mockserver2.s = s
+ mockserver3.s = s
+
+ s.hungry()
+
+ return mocknode.when_finished()
+
+
+class Test(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
+ def startup(self, basedir):
+ self.basedir = basedir
+ self.set_up_grid(num_clients=2, num_servers=5)
+ c1 = self.g.clients[1]
+ # We need multiple segments to test crypttext hash trees that are
+ # non-trivial (i.e. they have more than just one hash in them).
+ c1.encoding_params['max_segment_size'] = 12
+ # Tests that need to test servers of happiness using this should
+ # set their own value for happy -- the default (7) breaks stuff.
+ c1.encoding_params['happy'] = 1
+ d = c1.upload(Data(TEST_DATA, convergence=""))
+ def _after_upload(ur):
+ self.uri = ur.get_uri()
+ self.filenode = self.g.clients[0].create_node_from_uri(ur.get_uri())
+ return self.uri
+ d.addCallback(_after_upload)
return d
- def _download_and_check_plaintext(self, unused=None):
- self.downloader = self.clients[1].getServiceNamed("downloader")
- d = self.downloader.download_to_data(self.uri)
+ def _stash_shares(self, shares):
+ self.shares = shares
+ def _download_and_check_plaintext(self, ign=None):
+ num_reads = self._count_reads()
+ d = download_to_data(self.filenode)
def _after_download(result):
self.failUnlessEqual(result, TEST_DATA)
+ return self._count_reads() - num_reads
d.addCallback(_after_download)
return d
- def _delete_a_share(self, unused=None, sharenum=None):
- """ Delete one share. """
+ def _shuffled(self, num_shnums):
+ shnums = range(10)
+ random.shuffle(shnums)
+ return shnums[:num_shnums]
- shares = self.find_shares()
- ks = shares.keys()
- if sharenum is not None:
- k = [ key for key in shares.keys() if key[1] == sharenum ][0]
- else:
- k = random.choice(ks)
- del shares[k]
- self.replace_shares(shares, storage_index=self.uri.storage_index)
+ def _count_reads(self):
+ return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.read', 0)
+ for s in self.g.servers_by_number.values()])
+
+
+ def _count_allocates(self):
+ return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.allocate', 0)
+ for s in self.g.servers_by_number.values()])
- return unused
+ def _count_writes(self):
+ return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.write', 0)
+ for s in self.g.servers_by_number.values()])
def test_test_code(self):
# The following process of stashing the shares, running
# replace_shares, and asserting that the new set of shares equals the
# old is more to test this test code than to test the Tahoe code...
- d = defer.succeed(None)
- d.addCallback(self.find_shares)
- stash = [None]
- def _stash_it(res):
- stash[0] = res
- return res
- d.addCallback(_stash_it)
- d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
-
- def _compare(res):
- oldshares = stash[0]
- self.failUnless(isinstance(oldshares, dict), oldshares)
- self.failUnlessEqual(oldshares, res)
-
- d.addCallback(self.find_shares)
- d.addCallback(_compare)
-
- d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
- d.addCallback(self.find_shares)
- d.addCallback(lambda x: self.failUnlessEqual(x, {}))
-
- # The following process of deleting 8 of the shares and asserting that you can't
- # download it is more to test this test code than to test the Tahoe code...
- def _then_delete_8(unused=None):
- self.replace_shares(stash[0], storage_index=self.uri.storage_index)
- for i in range(8):
- self._delete_a_share()
- d.addCallback(_then_delete_8)
+ d = self.startup("immutable/Test/code")
+ d.addCallback(self.copy_shares)
+ d.addCallback(self._stash_shares)
+ d.addCallback(self._download_and_check_plaintext)
- def _then_download(unused=None):
- self.downloader = self.clients[1].getServiceNamed("downloader")
- d = self.downloader.download_to_data(self.uri)
-
- def _after_download_callb(result):
- self.fail() # should have gotten an errback instead
- return result
- def _after_download_errb(failure):
- failure.trap(NotEnoughSharesError)
- return None # success!
- d.addCallbacks(_after_download_callb, _after_download_errb)
- d.addCallback(_then_download)
-
- # The following process of leaving 8 of the shares deleted and asserting that you can't
- # repair it is more to test this test code than to test the Tahoe code...
- #TODO def _then_repair(unused=None):
- #TODO d2 = self.filenode.check_and_repair(Monitor(), verify=False)
- #TODO def _after_repair(checkandrepairresults):
- #TODO prerepairres = checkandrepairresults.get_pre_repair_results()
- #TODO postrepairres = checkandrepairresults.get_post_repair_results()
- #TODO self.failIf(prerepairres.is_healthy())
- #TODO self.failIf(postrepairres.is_healthy())
- #TODO d2.addCallback(_after_repair)
- #TODO return d2
- #TODO d.addCallback(_then_repair)
+ # The following process of deleting 8 of the shares and asserting
+ # that you can't download it is more to test this test code than to
+ # test the Tahoe code...
+ def _then_delete_8(ign):
+ self.restore_all_shares(self.shares)
+ self.delete_shares_numbered(self.uri, range(8))
+ d.addCallback(_then_delete_8)
+ d.addCallback(lambda ign:
+ self.shouldFail(NotEnoughSharesError, "download-2",
+ "ran out of shares",
+ download_to_data, self.filenode))
return d
- def _count_reads(self):
- sum_of_read_counts = 0
- for client in self.clients:
- counters = client.stats_provider.get_stats()['counters']
- sum_of_read_counts += counters.get('storage_server.read', 0)
- return sum_of_read_counts
-
- def _count_allocates(self):
- sum_of_allocate_counts = 0
- for client in self.clients:
- counters = client.stats_provider.get_stats()['counters']
- sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
- return sum_of_allocate_counts
-
- def _corrupt_a_share(self, unused, corruptor_func, sharenum):
- shares = self.find_shares()
- ks = [ key for key in shares.keys() if key[1] == sharenum ]
- assert ks, (shares.keys(), sharenum)
- k = ks[0]
- shares[k] = corruptor_func(shares[k])
- self.replace_shares(shares, storage_index=self.uri.storage_index)
- return corruptor_func
-
- def _corrupt_all_shares(self, unused, corruptor_func):
- """ All shares on disk will be corrupted by corruptor_func. """
- shares = self.find_shares()
- for k in shares.keys():
- self._corrupt_a_share(unused, corruptor_func, k[1])
- return corruptor_func
-
- def _corrupt_a_random_share(self, unused, corruptor_func):
- """ Exactly one share on disk will be corrupted by corruptor_func. """
- shares = self.find_shares()
- ks = shares.keys()
- k = random.choice(ks)
- self._corrupt_a_share(unused, corruptor_func, k[1])
- return corruptor_func
-
def test_download(self):
- """ Basic download. (This functionality is more or less already tested by test code in
- other modules, but this module is also going to test some more specific things about
- immutable download.)
+ """ Basic download. (This functionality is more or less already
+ tested by test code in other modules, but this module is also going
+ to test some more specific things about immutable download.)
"""
- d = defer.succeed(None)
- before_download_reads = self._count_reads()
- def _after_download(unused=None):
- after_download_reads = self._count_reads()
- # To pass this test, you have to download the file using only 10 reads total: 3 to
- # get the headers from each share, 3 to get the share hash trees and uebs from each
- # share, 1 to get the crypttext hashes, and 3 to get the block data from each share.
- self.failIf(after_download_reads-before_download_reads > 12, (after_download_reads, before_download_reads))
+ d = self.startup("immutable/Test/download")
d.addCallback(self._download_and_check_plaintext)
+ def _after_download(ign):
+ num_reads = self._count_reads()
+ #print num_reads
+ self.failIf(num_reads > 41, num_reads)
d.addCallback(_after_download)
return d
def test_download_from_only_3_remaining_shares(self):
- """ Test download after 7 random shares (of the 10) have been removed. """
- d = defer.succeed(None)
- def _then_delete_7(unused=None):
- for i in range(7):
- self._delete_a_share()
- before_download_reads = self._count_reads()
- d.addCallback(_then_delete_7)
- def _after_download(unused=None):
- after_download_reads = self._count_reads()
- # To pass this test, you have to download the file using only 10 reads to get the
- # UEB (in parallel from all shares), plus one read for each of the 3 shares.
- self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
+ """ Test download after 7 random shares (of the 10) have been
+ removed."""
+ d = self.startup("immutable/Test/download_from_only_3_remaining_shares")
+ d.addCallback(lambda ign:
+ self.delete_shares_numbered(self.uri, range(7)))
d.addCallback(self._download_and_check_plaintext)
+ def _after_download(num_reads):
+ #print num_reads
+ self.failIf(num_reads > 41, num_reads)
d.addCallback(_after_download)
return d
- def test_download_abort_if_too_many_missing_shares(self):
- """ Test that download gives up quickly when it realizes there aren't enough shares out
- there."""
- d = defer.succeed(None)
- def _then_delete_8(unused=None):
- for i in range(8):
- self._delete_a_share()
- d.addCallback(_then_delete_8)
-
- before_download_reads = self._count_reads()
- def _attempt_to_download(unused=None):
- downloader = self.clients[1].getServiceNamed("downloader")
- d = downloader.download_to_data(self.uri)
-
- def _callb(res):
- self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
- def _errb(f):
- self.failUnless(f.check(NotEnoughSharesError))
- d.addCallbacks(_callb, _errb)
- return d
-
- d.addCallback(_attempt_to_download)
-
- def _after_attempt(unused=None):
- after_download_reads = self._count_reads()
- # To pass this test, you are required to give up before actually trying to read any
- # share data.
- self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads))
- d.addCallback(_after_attempt)
+ def test_download_from_only_3_shares_with_good_crypttext_hash(self):
+ """ Test download after 7 random shares (of the 10) have had their
+ crypttext hash tree corrupted."""
+ d = self.startup("download_from_only_3_shares_with_good_crypttext_hash")
+ def _corrupt_7(ign):
+ c = common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes
+ self.corrupt_shares_numbered(self.uri, self._shuffled(7), c)
+ d.addCallback(_corrupt_7)
+ d.addCallback(self._download_and_check_plaintext)
return d
- def test_download_abort_if_too_many_corrupted_shares(self):
- """ Test that download gives up quickly when it realizes there aren't enough uncorrupted
- shares out there. It should be able to tell because the corruption occurs in the
- sharedata version number, which it checks first."""
- d = defer.succeed(None)
- def _then_corrupt_8(unused=None):
- shnums = range(10)
- random.shuffle(shnums)
- for shnum in shnums[:8]:
- self._corrupt_a_share(None, _corrupt_sharedata_version_number, shnum)
- d.addCallback(_then_corrupt_8)
-
- before_download_reads = self._count_reads()
- def _attempt_to_download(unused=None):
- downloader = self.clients[1].getServiceNamed("downloader")
- d = downloader.download_to_data(self.uri)
-
- def _callb(res):
- self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
- def _errb(f):
- self.failUnless(f.check(NotEnoughSharesError))
- d.addCallbacks(_callb, _errb)
- return d
-
- d.addCallback(_attempt_to_download)
-
- def _after_attempt(unused=None):
- after_download_reads = self._count_reads()
- # To pass this test, you are required to give up before reading all of the share
- # data. Actually, we could give up sooner than 45 reads, but currently our download
- # code does 45 reads. This test then serves as a "performance regression detector"
- # -- if you change download code so that it takes *more* reads, then this test will
- # fail.
- self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads))
- d.addCallback(_after_attempt)
+ def test_download_abort_if_too_many_missing_shares(self):
+ """ Test that download gives up quickly when it realizes there aren't
+ enough shares out there."""
+ d = self.startup("download_abort_if_too_many_missing_shares")
+ d.addCallback(lambda ign:
+ self.delete_shares_numbered(self.uri, range(8)))
+ d.addCallback(lambda ign:
+ self.shouldFail(NotEnoughSharesError, "delete 8",
+ "Last failure: None",
+ download_to_data, self.filenode))
+ # the new downloader pipelines a bunch of read requests in parallel,
+ # so don't bother asserting anything about the number of reads
return d
- def test_check_without_verify(self):
- """ Check says the file is healthy when none of the shares have been touched. It says
- that the file is unhealthy when all of them have been removed. It doesn't use any reads.
- """
- d = defer.succeed(self.filenode)
- def _check1(filenode):
- before_check_reads = self._count_reads()
-
- d2 = filenode.check(Monitor(), verify=False)
- def _after_check(checkresults):
- after_check_reads = self._count_reads()
- self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
- self.failUnless(checkresults.is_healthy())
-
- d2.addCallback(_after_check)
- return d2
- d.addCallback(_check1)
-
- d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
- def _check2(ignored):
- before_check_reads = self._count_reads()
- d2 = self.filenode.check(Monitor(), verify=False)
-
- def _after_check(checkresults):
- after_check_reads = self._count_reads()
- self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
- self.failIf(checkresults.is_healthy())
-
- d2.addCallback(_after_check)
+ def test_download_abort_if_too_many_corrupted_shares(self):
+ """Test that download gives up quickly when it realizes there aren't
+ enough uncorrupted shares out there. It should be able to tell
+ because the corruption occurs in the sharedata version number, which
+ it checks first."""
+ d = self.startup("download_abort_if_too_many_corrupted_shares")
+ def _corrupt_8(ign):
+ c = common._corrupt_sharedata_version_number
+ self.corrupt_shares_numbered(self.uri, self._shuffled(8), c)
+ d.addCallback(_corrupt_8)
+ def _try_download(ign):
+ start_reads = self._count_reads()
+ d2 = self.shouldFail(NotEnoughSharesError, "corrupt 8",
+ "LayoutInvalid",
+ download_to_data, self.filenode)
+ def _check_numreads(ign):
+ num_reads = self._count_reads() - start_reads
+ #print num_reads
+
+ # To pass this test, you are required to give up before
+ # reading all of the share data. Actually, we could give up
+ # sooner than 45 reads, but currently our download code does
+ # 45 reads. This test then serves as a "performance
+ # regression detector" -- if you change download code so that
+ # it takes *more* reads, then this test will fail.
+ self.failIf(num_reads > 45, num_reads)
+ d2.addCallback(_check_numreads)
return d2
- d.addCallback(_check2)
-
+ d.addCallback(_try_download)
return d
- def _help_test_verify(self, corruptor_funcs, judgement_func):
- LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal.
- DELTA_READS = 10 * LEEWAY # N = 10
- d = defer.succeed(None)
-
- d.addCallback(self.find_shares)
- stash = [None]
- def _stash_it(res):
- stash[0] = res
- return res
- d.addCallback(_stash_it)
- def _put_it_all_back(ignored):
- self.replace_shares(stash[0], storage_index=self.uri.storage_index)
- return ignored
-
- def _verify_after_corruption(corruptor_func):
- before_check_reads = self._count_reads()
- d2 = self.filenode.check(Monitor(), verify=True)
- def _after_check(checkresults):
- after_check_reads = self._count_reads()
- self.failIf(after_check_reads - before_check_reads > DELTA_READS)
- try:
- return judgement_func(checkresults)
- except Exception, le:
- le.args = tuple(le.args + ("corruptor_func: " + corruptor_func.__name__,))
- raise
-
- d2.addCallback(_after_check)
- return d2
-
- for corruptor_func in corruptor_funcs:
- d.addCallback(self._corrupt_a_random_share, corruptor_func)
- d.addCallback(_verify_after_corruption)
- d.addCallback(_put_it_all_back)
-
+ def test_download_to_data(self):
+ d = self.startup("download_to_data")
+ d.addCallback(lambda ign: self.filenode.download_to_data())
+ d.addCallback(lambda data:
+ self.failUnlessEqual(data, common.TEST_DATA))
return d
- def test_verify_no_problem(self):
- """ Verify says the file is healthy when none of the shares have been touched in a way
- that matters. It doesn't use more than seven times as many reads as it needs."""
- def judge(checkresults):
- self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 10, data)
- self.failUnless(len(data['sharemap']) == 10, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['list-corrupt-shares']) == 0, data)
- return self._help_test_verify([
- _corrupt_nothing,
- _corrupt_size_of_file_data,
- _corrupt_size_of_sharedata,
- _corrupt_segment_size, ], judge)
-
- def test_verify_server_visible_corruption(self):
- """ Corruption which is detected by the server means that the server will send you back
- a Failure in response to get_bucket instead of giving you the share data. Test that
- verifier handles these answers correctly. It doesn't use more than seven times as many
- reads as it needs."""
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- # The server might fail to serve up its other share as well as the corrupted
- # one, so count-shares-good could be 8 or 9.
- self.failUnless(data['count-shares-good'] in (8, 9), data)
- self.failUnless(len(data['sharemap']) in (8, 9,), data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- # The server may have served up the non-corrupted share, or it may not have, so
- # the checker could have detected either 4 or 5 good servers.
- self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
- self.failUnless(len(data['servers-responding']) in (4, 5), data)
- # If the server served up the other share, then the checker should consider it good, else it should
- # not.
- self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
- self.failUnless(len(data['list-corrupt-shares']) == 0, data)
- return self._help_test_verify([
- _corrupt_file_version_number,
- ], judge)
-
- def test_verify_share_incompatibility(self):
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 9, data)
- self.failUnless(len(data['sharemap']) == 9, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['list-corrupt-shares']) == 1, data)
- self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == 0, data)
- return self._help_test_verify([
- _corrupt_sharedata_version_number,
- ], judge)
-
- def test_verify_server_invisible_corruption(self):
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 9, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(data['count-corrupt-shares'] == 1, (data,))
- self.failUnless(len(data['list-corrupt-shares']) == 1, data)
- self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == 0, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['sharemap']) == 9, data)
- return self._help_test_verify([
- _corrupt_offset_of_sharedata,
- _corrupt_offset_of_uri_extension,
- _corrupt_offset_of_uri_extension_to_force_short_read,
- _corrupt_share_data,
- _corrupt_length_of_uri_extension,
- _corrupt_uri_extension,
- ], judge)
-
- def test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO(self):
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 9, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(data['count-corrupt-shares'] == 1, (data,))
- self.failUnless(len(data['list-corrupt-shares']) == 1, data)
- self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == 0, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['sharemap']) == 9, data)
- return self._help_test_verify([
- _corrupt_offset_of_block_hashes,
- ], judge)
- test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
-
- def test_verify_server_invisible_corruption_sharedata_plausible_version(self):
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 9, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(data['count-corrupt-shares'] == 1, (data,))
- self.failUnless(len(data['list-corrupt-shares']) == 1, data)
- self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == 0, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['sharemap']) == 9, data)
- return self._help_test_verify([
- _corrupt_sharedata_version_number_to_plausible_version,
- ], judge)
-
- def test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO(self):
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 9, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(data['count-corrupt-shares'] == 1, (data,))
- self.failUnless(len(data['list-corrupt-shares']) == 1, data)
- self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == 0, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['sharemap']) == 9, data)
- return self._help_test_verify([
- _corrupt_offset_of_share_hashes,
- ], judge)
- test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
-
- def test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO(self):
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 9, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(data['count-corrupt-shares'] == 1, (data,))
- self.failUnless(len(data['list-corrupt-shares']) == 1, data)
- self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == 0, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['sharemap']) == 9, data)
- return self._help_test_verify([
- _corrupt_offset_of_ciphertext_hash_tree,
- ], judge)
- test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
-
- def test_verify_server_invisible_corruption_cryptext_hash_tree_TODO(self):
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 9, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(data['count-corrupt-shares'] == 1, (data,))
- self.failUnless(len(data['list-corrupt-shares']) == 1, data)
- self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == 0, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['sharemap']) == 9, data)
- return self._help_test_verify([
- _corrupt_crypttext_hash_tree,
- ], judge)
- test_verify_server_invisible_corruption_cryptext_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
-
- def test_verify_server_invisible_corruption_block_hash_tree_TODO(self):
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 9, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(data['count-corrupt-shares'] == 1, (data,))
- self.failUnless(len(data['list-corrupt-shares']) == 1, data)
- self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == 0, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['sharemap']) == 9, data)
- return self._help_test_verify([
- _corrupt_block_hashes,
- ], judge)
- test_verify_server_invisible_corruption_block_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
-
- def test_verify_server_invisible_corruption_share_hash_tree_TODO(self):
- def judge(checkresults):
- self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
- data = checkresults.get_data()
- self.failUnless(data['count-shares-good'] == 9, data)
- self.failUnless(data['count-shares-needed'] == 3, data)
- self.failUnless(data['count-shares-expected'] == 10, data)
- self.failUnless(data['count-good-share-hosts'] == 5, data)
- self.failUnless(data['count-corrupt-shares'] == 1, (data,))
- self.failUnless(len(data['list-corrupt-shares']) == 1, data)
- self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
- self.failUnless(len(data['list-incompatible-shares']) == 0, data)
- self.failUnless(len(data['servers-responding']) == 5, data)
- self.failUnless(len(data['sharemap']) == 9, data)
- return self._help_test_verify([
- _corrupt_share_hashes,
- ], judge)
- test_verify_server_invisible_corruption_share_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
-
- def test_repair(self):
- """ Repair replaces a share that got deleted. """
- # N == 10. 7 is the "efficiency leeway" -- we'll allow you to pass this test even if
- # you trigger seven times as many disk reads and blocks sends as would be optimal.
- DELTA_READS = 10 * 7
- # We'll allow you to pass this test only if you repair the missing share using only a
- # single allocate.
- DELTA_ALLOCATES = 1
-
- d = defer.succeed(self.filenode)
- d.addCallback(self._delete_a_share, sharenum=2)
-
- def _repair_from_deletion_of_1(filenode):
- before_repair_reads = self._count_reads()
- before_repair_allocates = self._count_allocates()
-
- d2 = filenode.check_and_repair(Monitor(), verify=False)
- def _after_repair(checkandrepairresults):
- assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
- prerepairres = checkandrepairresults.get_pre_repair_results()
- assert isinstance(prerepairres, check_results.CheckResults), prerepairres
- postrepairres = checkandrepairresults.get_post_repair_results()
- assert isinstance(postrepairres, check_results.CheckResults), postrepairres
- after_repair_reads = self._count_reads()
- after_repair_allocates = self._count_allocates()
-
- # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
- self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
- self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
- self.failIf(prerepairres.is_healthy())
- self.failUnless(postrepairres.is_healthy())
-
- # Now we inspect the filesystem to make sure that it has 10 shares.
- shares = self.find_shares()
- self.failIf(len(shares) < 10)
-
- # Now we delete seven of the other shares, then try to download the file and
- # assert that it succeeds at downloading and has the right contents. This can't
- # work unless it has already repaired the previously-deleted share #2.
- for sharenum in range(3, 10):
- self._delete_a_share(sharenum=sharenum)
-
- return self._download_and_check_plaintext()
-
- d2.addCallback(_after_repair)
- return d2
- d.addCallback(_repair_from_deletion_of_1)
-
- # Now we repair again to get all of those 7 back...
- def _repair_from_deletion_of_7(filenode):
- before_repair_reads = self._count_reads()
- before_repair_allocates = self._count_allocates()
-
- d2 = filenode.check_and_repair(Monitor(), verify=False)
- def _after_repair(checkandrepairresults):
- prerepairres = checkandrepairresults.get_pre_repair_results()
- postrepairres = checkandrepairresults.get_post_repair_results()
- after_repair_reads = self._count_reads()
- after_repair_allocates = self._count_allocates()
- # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
- self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
- self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
- self.failIf(prerepairres.is_healthy())
- self.failUnless(postrepairres.is_healthy())
-
- # Now we inspect the filesystem to make sure that it has 10 shares.
- shares = self.find_shares()
- self.failIf(len(shares) < 10)
-
- return self._download_and_check_plaintext()
-
- d2.addCallback(_after_repair)
- return d2
- d.addCallback(_repair_from_deletion_of_7)
-
- def _repair_from_corruption(filenode):
- before_repair_reads = self._count_reads()
- before_repair_allocates = self._count_allocates()
-
- d2 = filenode.check_and_repair(Monitor(), verify=False)
- def _after_repair(checkandrepairresults):
- prerepairres = checkandrepairresults.get_pre_repair_results()
- postrepairres = checkandrepairresults.get_post_repair_results()
- after_repair_reads = self._count_reads()
- after_repair_allocates = self._count_allocates()
-
- # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
- self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
- self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
- self.failIf(prerepairres.is_healthy())
- self.failUnless(postrepairres.is_healthy())
-
- return self._download_and_check_plaintext()
-
- d2.addCallback(_after_repair)
- return d2
+ def test_download_best_version(self):
+ d = self.startup("download_best_version")
+ d.addCallback(lambda ign: self.filenode.download_best_version())
+ d.addCallback(lambda data:
+ self.failUnlessEqual(data, common.TEST_DATA))
+ return d
- for corruptor_func in (
- _corrupt_file_version_number,
- _corrupt_sharedata_version_number,
- _corrupt_sharedata_version_number_to_plausible_version,
- _corrupt_offset_of_sharedata,
- _corrupt_offset_of_ciphertext_hash_tree,
- _corrupt_offset_of_block_hashes,
- _corrupt_offset_of_share_hashes,
- _corrupt_offset_of_uri_extension,
- _corrupt_share_data,
- _corrupt_crypttext_hash_tree,
- _corrupt_block_hashes,
- _corrupt_share_hashes,
- _corrupt_length_of_uri_extension,
- _corrupt_uri_extension,
- ):
- # Now we corrupt a share...
- d.addCallback(self._corrupt_a_random_share, corruptor_func)
- # And repair...
- d.addCallback(_repair_from_corruption)
+ def test_get_best_readable_version(self):
+ d = self.startup("get_best_readable_version")
+ d.addCallback(lambda ign: self.filenode.get_best_readable_version())
+ d.addCallback(lambda n2:
+ self.failUnlessEqual(n2, self.filenode))
return d
- test_repair.todo = "We haven't implemented a repairer yet."
+ def test_get_size_of_best_version(self):
+ d = self.startup("get_size_of_best_version")
+ d.addCallback(lambda ign: self.filenode.get_size_of_best_version())
+ d.addCallback(lambda size:
+ self.failUnlessEqual(size, len(common.TEST_DATA)))
+ return d
-# XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
-# XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
+# XXX extend these tests to show bad behavior of various kinds from servers:
+# raising exception from each remove_foo() method, for example
# XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
+
+# TODO: delete this whole file