DeepCheckResults, DeepCheckAndRepairResults
from allmydata.mutable.common import CorruptShareError
from allmydata.mutable.layout import unpack_header
-from allmydata.storage.server import storage_index_to_dir
from allmydata.storage.mutable import MutableShareFile
from allmydata.util import hashutil, log, fileutil, pollmixin
from allmydata.util.assertutil import precondition
-from allmydata.util.consumer import download_to_data
from allmydata.stats import StatsGathererService
from allmydata.key_generator import KeyGeneratorService
import allmydata.test.common_util as testutil
TEST_DATA="\x02"*(immutable.upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
-class ShareManglingMixin(SystemTestMixin):
-
- def setUp(self):
- # Set self.basedir to a temp dir which has the name of the current
- # test method in its name.
- self.basedir = self.mktemp()
-
- d = defer.maybeDeferred(SystemTestMixin.setUp, self)
- d.addCallback(lambda x: self.set_up_nodes())
-
- def _upload_a_file(ignored):
- cl0 = self.clients[0]
- # We need multiple segments to test crypttext hash trees that are
- # non-trivial (i.e. they have more than just one hash in them).
- cl0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
- # Tests that need to test servers of happiness using this should
- # set their own value for happy -- the default (7) breaks stuff.
- cl0.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
- d2 = cl0.upload(immutable.upload.Data(TEST_DATA, convergence=""))
- def _after_upload(u):
- filecap = u.uri
- self.n = self.clients[1].create_node_from_uri(filecap)
- self.uri = uri.CHKFileURI.init_from_string(filecap)
- return cl0.create_node_from_uri(filecap)
- d2.addCallback(_after_upload)
- return d2
- d.addCallback(_upload_a_file)
-
- def _stash_it(filenode):
- self.filenode = filenode
- d.addCallback(_stash_it)
- return d
-
- def find_all_shares(self, unused=None):
- """Locate shares on disk. Returns a dict that maps
- (clientnum,sharenum) to a string that contains the share container
- (copied directly from the disk, containing leases etc). You can
- modify this dict and then call replace_shares() to modify the shares.
- """
- shares = {} # k: (i, sharenum), v: data
-
- for i, c in enumerate(self.clients):
- sharedir = c.getServiceNamed("storage").sharedir
- for (dirp, dirns, fns) in os.walk(sharedir):
- for fn in fns:
- try:
- sharenum = int(fn)
- except TypeError:
- # Whoops, I guess that's not a share file then.
- pass
- else:
- data = open(os.path.join(sharedir, dirp, fn), "rb").read()
- shares[(i, sharenum)] = data
-
- return shares
-
- def replace_shares(self, newshares, storage_index):
- """Replace shares on disk. Takes a dictionary in the same form
- as find_all_shares() returns."""
-
- for i, c in enumerate(self.clients):
- sharedir = c.getServiceNamed("storage").sharedir
- for (dirp, dirns, fns) in os.walk(sharedir):
- for fn in fns:
- try:
- sharenum = int(fn)
- except TypeError:
- # Whoops, I guess that's not a share file then.
- pass
- else:
- pathtosharefile = os.path.join(sharedir, dirp, fn)
- os.unlink(pathtosharefile)
- for ((clientnum, sharenum), newdata) in newshares.iteritems():
- if clientnum == i:
- fullsharedirp=os.path.join(sharedir, storage_index_to_dir(storage_index))
- fileutil.make_dirs(fullsharedirp)
- wf = open(os.path.join(fullsharedirp, str(sharenum)), "wb")
- wf.write(newdata)
- wf.close()
-
- def _delete_a_share(self, unused=None, sharenum=None):
- """ Delete one share. """
-
- shares = self.find_all_shares()
- ks = shares.keys()
- if sharenum is not None:
- k = [ key for key in shares.keys() if key[1] == sharenum ][0]
- else:
- k = random.choice(ks)
- del shares[k]
- self.replace_shares(shares, storage_index=self.uri.get_storage_index())
-
- return unused
-
- def _corrupt_a_share(self, unused, corruptor_func, sharenum):
- shares = self.find_all_shares()
- ks = [ key for key in shares.keys() if key[1] == sharenum ]
- assert ks, (shares.keys(), sharenum)
- k = ks[0]
- shares[k] = corruptor_func(shares[k])
- self.replace_shares(shares, storage_index=self.uri.get_storage_index())
- return corruptor_func
-
- def _corrupt_all_shares(self, unused, corruptor_func):
- """ All shares on disk will be corrupted by corruptor_func. """
- shares = self.find_all_shares()
- for k in shares.keys():
- self._corrupt_a_share(unused, corruptor_func, k[1])
- return corruptor_func
-
- def _corrupt_a_random_share(self, unused, corruptor_func):
- """ Exactly one share on disk will be corrupted by corruptor_func. """
- shares = self.find_all_shares()
- ks = shares.keys()
- k = random.choice(ks)
- self._corrupt_a_share(unused, corruptor_func, k[1])
- return k[1]
-
- def _count_reads(self):
- sum_of_read_counts = 0
- for thisclient in self.clients:
- counters = thisclient.stats_provider.get_stats()['counters']
- sum_of_read_counts += counters.get('storage_server.read', 0)
- return sum_of_read_counts
-
- def _count_allocates(self):
- sum_of_allocate_counts = 0
- for thisclient in self.clients:
- counters = thisclient.stats_provider.get_stats()['counters']
- sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
- return sum_of_allocate_counts
-
- def _count_writes(self):
- sum_of_write_counts = 0
- for thisclient in self.clients:
- counters = thisclient.stats_provider.get_stats()['counters']
- sum_of_write_counts += counters.get('storage_server.write', 0)
- return sum_of_write_counts
-
- def _download_and_check_plaintext(self, unused=None):
- d = download_to_data(self.n)
- def _after_download(result):
- self.failUnlessEqual(result, TEST_DATA)
- d.addCallback(_after_download)
- return d
-
class ShouldFailMixin:
def shouldFail(self, expected_failure, which, substring,
callable, *args, **kwargs):
-from allmydata.test import common
-from allmydata.interfaces import NotEnoughSharesError
-from allmydata.util.consumer import download_to_data
-from allmydata import uri
-from twisted.internet import defer
-from twisted.trial import unittest
import random
+from twisted.trial import unittest
+from twisted.internet import defer
+import mock
from foolscap.api import eventually
+
+from allmydata.test import common
+from allmydata.test.no_network import GridTestMixin
+from allmydata.test.common import TEST_DATA
+from allmydata import uri
from allmydata.util import log
+from allmydata.util.consumer import download_to_data
+from allmydata.interfaces import NotEnoughSharesError
+from allmydata.immutable.upload import Data
from allmydata.immutable.downloader import finder
-import mock
-
class MockNode(object):
def __init__(self, check_reneging, check_fetch_failed):
self.got = 0
return mocknode.when_finished()
-class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase):
+
+class Test(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
+ def startup(self, basedir):
+ self.basedir = basedir
+ self.set_up_grid(num_clients=2, num_servers=5)
+ c1 = self.g.clients[1]
+ # We need multiple segments to test crypttext hash trees that are
+ # non-trivial (i.e. they have more than just one hash in them).
+ c1.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
+ # Tests that need to test servers of happiness using this should
+ # set their own value for happy -- the default (7) breaks stuff.
+ c1.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
+ d = c1.upload(Data(TEST_DATA, convergence=""))
+ def _after_upload(ur):
+ self.uri = ur.uri
+ self.filenode = self.g.clients[0].create_node_from_uri(ur.uri)
+ return self.uri
+ d.addCallback(_after_upload)
+ return d
+
+ def _stash_shares(self, shares):
+ self.shares = shares
+
+ def _download_and_check_plaintext(self, ign=None):
+ num_reads = self._count_reads()
+ d = download_to_data(self.filenode)
+ def _after_download(result):
+ self.failUnlessEqual(result, TEST_DATA)
+ return self._count_reads() - num_reads
+ d.addCallback(_after_download)
+ return d
+
+ def _shuffled(self, num_shnums):
+ shnums = range(10)
+ random.shuffle(shnums)
+ return shnums[:num_shnums]
+
+ def _count_reads(self):
+ return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.read', 0)
+ for s in self.g.servers_by_number.values()])
+
+
+ def _count_allocates(self):
+ return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.allocate', 0)
+ for s in self.g.servers_by_number.values()])
+
+ def _count_writes(self):
+ return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.write', 0)
+ for s in self.g.servers_by_number.values()])
+
def test_test_code(self):
# The following process of stashing the shares, running
# replace_shares, and asserting that the new set of shares equals the
# old is more to test this test code than to test the Tahoe code...
- d = defer.succeed(None)
- d.addCallback(self.find_all_shares)
- stash = [None]
- def _stash_it(res):
- stash[0] = res
- return res
- d.addCallback(_stash_it)
+ d = self.startup("immutable/Test/code")
+ d.addCallback(self.copy_shares)
+ d.addCallback(self._stash_shares)
+ d.addCallback(self._download_and_check_plaintext)
# The following process of deleting 8 of the shares and asserting
# that you can't download it is more to test this test code than to
# test the Tahoe code...
- def _then_delete_8(unused=None):
- self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
- for i in range(8):
- self._delete_a_share()
+ def _then_delete_8(ign):
+ self.restore_all_shares(self.shares)
+ self.delete_shares_numbered(self.uri, range(8))
d.addCallback(_then_delete_8)
-
- def _then_download(unused=None):
- d2 = download_to_data(self.n)
-
- def _after_download_callb(result):
- self.fail() # should have gotten an errback instead
- return result
- def _after_download_errb(failure):
- failure.trap(NotEnoughSharesError)
- return None # success!
- d2.addCallbacks(_after_download_callb, _after_download_errb)
- return d2
- d.addCallback(_then_download)
-
+ d.addCallback(lambda ign:
+ self.shouldFail(NotEnoughSharesError, "download-2",
+ "ran out of shares",
+ download_to_data, self.filenode))
return d
def test_download(self):
tested by test code in other modules, but this module is also going
to test some more specific things about immutable download.)
"""
- d = defer.succeed(None)
- before_download_reads = self._count_reads()
- def _after_download(unused=None):
- after_download_reads = self._count_reads()
- #print before_download_reads, after_download_reads
- self.failIf(after_download_reads-before_download_reads > 41,
- (after_download_reads, before_download_reads))
+ d = self.startup("immutable/Test/download")
d.addCallback(self._download_and_check_plaintext)
+ def _after_download(ign):
+ num_reads = self._count_reads()
+ #print num_reads
+ self.failIf(num_reads > 41, num_reads)
d.addCallback(_after_download)
return d
def test_download_from_only_3_remaining_shares(self):
""" Test download after 7 random shares (of the 10) have been
removed."""
- d = defer.succeed(None)
- def _then_delete_7(unused=None):
- for i in range(7):
- self._delete_a_share()
- before_download_reads = self._count_reads()
- d.addCallback(_then_delete_7)
- def _after_download(unused=None):
- after_download_reads = self._count_reads()
- #print before_download_reads, after_download_reads
- self.failIf(after_download_reads-before_download_reads > 41, (after_download_reads, before_download_reads))
+ d = self.startup("immutable/Test/download_from_only_3_remaining_shares")
+ d.addCallback(lambda ign:
+ self.delete_shares_numbered(self.uri, range(7)))
d.addCallback(self._download_and_check_plaintext)
+ def _after_download(num_reads):
+ #print num_reads
+ self.failIf(num_reads > 41, num_reads)
d.addCallback(_after_download)
return d
def test_download_from_only_3_shares_with_good_crypttext_hash(self):
""" Test download after 7 random shares (of the 10) have had their
crypttext hash tree corrupted."""
- d = defer.succeed(None)
- def _then_corrupt_7(unused=None):
- shnums = range(10)
- random.shuffle(shnums)
- for i in shnums[:7]:
- self._corrupt_a_share(None, common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes, i)
- #before_download_reads = self._count_reads()
- d.addCallback(_then_corrupt_7)
+ d = self.startup("download_from_only_3_shares_with_good_crypttext_hash")
+ def _corrupt_7(ign):
+ c = common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes
+ self.corrupt_shares_numbered(self.uri, self._shuffled(7), c)
+ d.addCallback(_corrupt_7)
d.addCallback(self._download_and_check_plaintext)
return d
def test_download_abort_if_too_many_missing_shares(self):
""" Test that download gives up quickly when it realizes there aren't
enough shares out there."""
- for i in range(8):
- self._delete_a_share()
- d = self.shouldFail(NotEnoughSharesError, "delete 8", None,
- download_to_data, self.n)
+ d = self.startup("download_abort_if_too_many_missing_shares")
+ d.addCallback(lambda ign:
+ self.delete_shares_numbered(self.uri, range(8)))
+ d.addCallback(lambda ign:
+ self.shouldFail(NotEnoughSharesError, "delete 8",
+ "Last failure: None",
+ download_to_data, self.filenode))
# the new downloader pipelines a bunch of read requests in parallel,
# so don't bother asserting anything about the number of reads
return d
enough uncorrupted shares out there. It should be able to tell
because the corruption occurs in the sharedata version number, which
it checks first."""
- d = defer.succeed(None)
- def _then_corrupt_8(unused=None):
- shnums = range(10)
- random.shuffle(shnums)
- for shnum in shnums[:8]:
- self._corrupt_a_share(None, common._corrupt_sharedata_version_number, shnum)
- d.addCallback(_then_corrupt_8)
-
- before_download_reads = self._count_reads()
- def _attempt_to_download(unused=None):
- d2 = download_to_data(self.n)
-
- def _callb(res):
- self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
- def _errb(f):
- self.failUnless(f.check(NotEnoughSharesError))
- d2.addCallbacks(_callb, _errb)
+ d = self.startup("download_abort_if_too_many_corrupted_shares")
+ def _corrupt_8(ign):
+ c = common._corrupt_sharedata_version_number
+ self.corrupt_shares_numbered(self.uri, self._shuffled(8), c)
+ d.addCallback(_corrupt_8)
+ def _try_download(ign):
+ start_reads = self._count_reads()
+ d2 = self.shouldFail(NotEnoughSharesError, "corrupt 8",
+ "LayoutInvalid",
+ download_to_data, self.filenode)
+ def _check_numreads(ign):
+ num_reads = self._count_reads() - start_reads
+ #print num_reads
+
+ # To pass this test, you are required to give up before
+ # reading all of the share data. Actually, we could give up
+ # sooner than 45 reads, but currently our download code does
+ # 45 reads. This test then serves as a "performance
+ # regression detector" -- if you change download code so that
+ # it takes *more* reads, then this test will fail.
+ self.failIf(num_reads > 45, num_reads)
+ d2.addCallback(_check_numreads)
return d2
-
- d.addCallback(_attempt_to_download)
-
- def _after_attempt(unused=None):
- after_download_reads = self._count_reads()
- #print before_download_reads, after_download_reads
- # To pass this test, you are required to give up before reading
- # all of the share data. Actually, we could give up sooner than
- # 45 reads, but currently our download code does 45 reads. This
- # test then serves as a "performance regression detector" -- if
- # you change download code so that it takes *more* reads, then
- # this test will fail.
- self.failIf(after_download_reads-before_download_reads > 45,
- (after_download_reads, before_download_reads))
- d.addCallback(_after_attempt)
+ d.addCallback(_try_download)
return d