From: David-Sarah Hopwood Date: Thu, 27 Dec 2012 00:00:17 +0000 (+0000) Subject: Remove ResponseCache in favor of MDMFSlotReadProxy's cache. closes #1240. X-Git-Tag: allmydata-tahoe-1.10a1~32 X-Git-Url: https://git.rkrishnan.org/pf/content/simplejson/frontends/flags?a=commitdiff_plain;h=4563ba456b1b2d640551956374b0cfbfc602dcec;p=tahoe-lafs%2Ftahoe-lafs.git Remove ResponseCache in favor of MDMFSlotReadProxy's cache. closes #1240. This contains several merged patches. Individual messages follow, latest first: * Fix a warning from check-miscaptures. * In retrieve.py, explicitly test whether a key is in self.servermap.proxies rather than catching KeyError. * Added a new comment to the MDMF version of the test I removed, explaining the removal of the SDMF version. * Removed test_corrupt_all_block_hash_tree_late, since the entire block_hash_tree is cached in the servermap for an SDMF file. * Fixed several tests that require files larger than the servermap cache. * Remove unused test_response_cache_memory_leak(). * Exercise the cache. * Test infrastructure for counting cache misses on MDMF files. * Removed the ResponseCache. Instead, the MDMFSlotReadProxy initialized by ServerMap is kept around so Retrieve can access it. The ReadProxy has a cache of the first 1000 bytes initially read from each share by the ServerMap. We're able to satisfy a number of requests out of this cache, so roundtrips are reduced from 84 to 60 in test_deepcheck_mdmf. There is still some mystery about under what conditions the cache has fewer than 1000 bytes. Also this breaks some existing unit tests that depend on the inner behavior of ResponseCache. * The servermap.proxies (a cache of SlotReadProxies) is now keyed by (verinfo,serverid,shnum) rather than just (serverid,shnum) * Minor cosmetic changes * Added a test failure if the number of cache misses is too high. Author: Andrew Miller Signed-off-by: David-Sarah Hopwood --- diff --git a/src/allmydata/mutable/common.py b/src/allmydata/mutable/common.py index 9ce8e37c..16f39b30 100644 --- a/src/allmydata/mutable/common.py +++ b/src/allmydata/mutable/common.py @@ -1,6 +1,4 @@ -from allmydata.util.spans import DataSpans - MODE_CHECK = "MODE_CHECK" # query all peers MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version MODE_WRITE = "MODE_WRITE" # replace all shares, probably.. not for initial @@ -59,56 +57,3 @@ class CorruptShareError(BadShareError): class UnknownVersionError(BadShareError): """The share we received was of a version we don't recognize.""" - -class ResponseCache: - """I cache share data, to reduce the number of round trips used during - mutable file operations. All of the data in my cache is for a single - storage index, but I will keep information on multiple shares for - that storage index. - - I maintain a highest-seen sequence number, and will flush all entries - each time this number increases (this doesn't necessarily imply that - all entries have the same sequence number). - - My cache is indexed by a (verinfo, shnum) tuple. - - My cache entries are DataSpans instances, each representing a set of - non-overlapping byteranges. - """ - - def __init__(self): - self.cache = {} - self.seqnum = None - - def _clear(self): - # also used by unit tests - self.cache = {} - - def add(self, verinfo, shnum, offset, data): - seqnum = verinfo[0] - if seqnum > self.seqnum: - self._clear() - self.seqnum = seqnum - - index = (verinfo, shnum) - if index in self.cache: - self.cache[index].add(offset, data) - else: - spans = DataSpans() - spans.add(offset, data) - self.cache[index] = spans - - def read(self, verinfo, shnum, offset, length): - """Try to satisfy a read request from cache. - Returns data, or None if the cache did not hold the entire requested span. - """ - - # TODO: perhaps return a DataSpans object representing the fragments - # that we have, instead of only returning a hit if we can satisfy the - # whole request from cache. - - index = (verinfo, shnum) - if index in self.cache: - return self.cache[index].get(offset, length) - else: - return None diff --git a/src/allmydata/mutable/filenode.py b/src/allmydata/mutable/filenode.py index 61ccd39c..50bbdd34 100644 --- a/src/allmydata/mutable/filenode.py +++ b/src/allmydata/mutable/filenode.py @@ -17,7 +17,7 @@ from pycryptopp.cipher.aes import AES from allmydata.mutable.publish import Publish, MutableData,\ TransformingUploadable from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \ - ResponseCache, UncoordinatedWriteError + UncoordinatedWriteError from allmydata.mutable.servermap import ServerMap, ServermapUpdater from allmydata.mutable.retrieve import Retrieve from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer @@ -65,7 +65,6 @@ class MutableFileNode: self._required_shares = default_encoding_parameters["k"] self._total_shares = default_encoding_parameters["n"] self._sharemap = {} # known shares, shnum-to-[nodeids] - self._cache = ResponseCache() self._most_recent_size = None # filled in after __init__ if we're being created for the first time; # filled in by the servermap updater before publishing, otherwise. @@ -180,10 +179,6 @@ class MutableFileNode: self._privkey = privkey def _populate_encprivkey(self, encprivkey): self._encprivkey = encprivkey - def _add_to_cache(self, verinfo, shnum, offset, data): - self._cache.add(verinfo, shnum, offset, data) - def _read_from_cache(self, verinfo, shnum, offset, length): - return self._cache.read(verinfo, shnum, offset, length) def get_write_enabler(self, server): seed = server.get_foolscap_write_enabler_seed() diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py index 33d647b9..75598a4d 100644 --- a/src/allmydata/mutable/layout.py +++ b/src/allmydata/mutable/layout.py @@ -1192,7 +1192,8 @@ class MDMFSlotReadProxy: rref, storage_index, shnum, - data=""): + data="", + data_is_everything=False): # Start the initialization process. self._rref = rref self._storage_index = storage_index @@ -1223,8 +1224,14 @@ class MDMFSlotReadProxy: # If the user has chosen to initialize us with some data, we'll # try to satisfy subsequent data requests with that data before - # asking the storage server for it. If + # asking the storage server for it. self._data = data + + # If the provided data is known to be complete, then we know there's + # nothing to be gained by querying the server, so we should just + # partially satisfy requests with what we have. + self._data_is_everything = data_is_everything + # The way callers interact with cache in the filenode returns # None if there isn't any cached data, but the way we index the # cached data requires a string, so convert None to "". @@ -1738,7 +1745,8 @@ class MDMFSlotReadProxy: # TODO: It's entirely possible to tweak this so that it just # fulfills the requests that it can, and not demand that all # requests are satisfiable before running it. - if not unsatisfiable and not force_remote: + + if not unsatisfiable or self._data_is_everything: results = [self._data[offset:offset+length] for (offset, length) in readvs] results = {self.shnum: results} diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 291fb120..77f8de27 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -286,16 +286,14 @@ class Retrieve: self.remaining_sharemap = DictOfSets() for (shnum, server, timestamp) in shares: self.remaining_sharemap.add(shnum, server) - # If the servermap update fetched anything, it fetched at least 1 - # KiB, so we ask for that much. - # TODO: Change the cache methods to allow us to fetch all of the - # data that they have, then change this method to do that. - any_cache = self._node._read_from_cache(self.verinfo, shnum, - 0, 1000) - reader = MDMFSlotReadProxy(server.get_rref(), - self._storage_index, - shnum, - any_cache) + # Reuse the SlotReader from the servermap. + key = (self.verinfo, server.get_serverid(), + self._storage_index, shnum) + if key in self.servermap.proxies: + reader = self.servermap.proxies[key] + else: + reader = MDMFSlotReadProxy(server.get_rref(), + self._storage_index, shnum, None) reader.server = server self.readers[shnum] = reader assert len(self.remaining_sharemap) >= k @@ -766,6 +764,7 @@ class Retrieve: block_and_salt, blockhashes, sharehashes = results block, salt = block_and_salt + assert type(block) is str, (block, salt) blockhashes = dict(enumerate(blockhashes)) self.log("the reader gave me the following blockhashes: %s" % \ @@ -838,12 +837,13 @@ class Retrieve: #needed.discard(0) self.log("getting blockhashes for segment %d, share %d: %s" % \ (segnum, reader.shnum, str(needed))) - d1 = reader.get_blockhashes(needed, force_remote=True) + # TODO is force_remote necessary here? + d1 = reader.get_blockhashes(needed, force_remote=False) if self.share_hash_tree.needed_hashes(reader.shnum): need = self.share_hash_tree.needed_hashes(reader.shnum) self.log("also need sharehashes for share %d: %s" % (reader.shnum, str(need))) - d2 = reader.get_sharehashes(need, force_remote=True) + d2 = reader.get_sharehashes(need, force_remote=False) else: d2 = defer.succeed({}) # the logic in the next method # expects a dict diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 130250b2..4ef85c58 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -119,6 +119,7 @@ class ServerMap: self._bad_shares = {} # maps (server,shnum) to old checkstring self._last_update_mode = None self._last_update_time = 0 + self.proxies = {} self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..] # where blockhashes is a list of bytestrings (the result of # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both @@ -631,19 +632,6 @@ class ServermapUpdater: self._servermap.add_problem(f) - def _cache_good_sharedata(self, verinfo, shnum, now, data): - """ - If one of my queries returns successfully (which means that we - were able to and successfully did validate the signature), I - cache the data that we initially fetched from the storage - server. This will help reduce the number of roundtrips that need - to occur when the file is downloaded, or when the file is - updated. - """ - if verinfo: - self._node._add_to_cache(verinfo, shnum, 0, data) - - def _got_results(self, datavs, server, readsize, storage_index, started): lp = self.log(format="got result from [%(name)s], %(numshares)d shares", name=server.get_name(), @@ -675,7 +663,9 @@ class ServermapUpdater: reader = MDMFSlotReadProxy(ss, storage_index, shnum, - data) + data, + data_is_everything=(len(data) < readsize)) + # our goal, with each response, is to validate the version # information and share data as best we can at this point -- # we do this by validating the signature. To do this, we @@ -747,13 +737,21 @@ class ServermapUpdater: d5 = defer.succeed(None) dl = defer.DeferredList([d, d2, d3, d4, d5]) + def _append_proxy(passthrough, shnum=shnum, reader=reader): + # Store the proxy (with its cache) keyed by serverid and + # version. + _, (_,verinfo), _, _, _ = passthrough + verinfo = self._make_verinfo_hashable(verinfo) + self._servermap.proxies[(verinfo, + server.get_serverid(), + storage_index, shnum)] = reader + return passthrough + dl.addCallback(_append_proxy) dl.addBoth(self._turn_barrier) dl.addCallback(lambda results, shnum=shnum: self._got_signature_one_share(results, shnum, server, lp)) dl.addErrback(lambda error, shnum=shnum, data=data: self._got_corrupt_share(error, shnum, server, data, lp)) - dl.addCallback(lambda verinfo, shnum=shnum, data=data: - self._cache_good_sharedata(verinfo, shnum, now, data)) ds.append(dl) # dl is a deferred list that will fire when all of the shares # that we found on this server are done processing. When dl fires, @@ -817,6 +815,10 @@ class ServermapUpdater: return None _, verinfo, signature, __, ___ = results + verinfo = self._make_verinfo_hashable(verinfo[1]) + + # This tuple uniquely identifies a share on the grid; we use it + # to keep track of the ones that we've already seen. (seqnum, root_hash, saltish, @@ -825,22 +827,8 @@ class ServermapUpdater: k, n, prefix, - offsets) = verinfo[1] - offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) + offsets_tuple) = verinfo - # XXX: This should be done for us in the method, so - # presumably you can go in there and fix it. - verinfo = (seqnum, - root_hash, - saltish, - segsize, - datalen, - k, - n, - prefix, - offsets_tuple) - # This tuple uniquely identifies a share on the grid; we use it - # to keep track of the ones that we've already seen. if verinfo not in self._valid_versions: # This is a new version tuple, and we need to validate it @@ -879,13 +867,7 @@ class ServermapUpdater: return verinfo - - def _got_update_results_one_share(self, results, share): - """ - I record the update results in results. - """ - assert len(results) == 4 - verinfo, blockhashes, start, end = results + def _make_verinfo_hashable(self, verinfo): (seqnum, root_hash, saltish, @@ -895,10 +877,9 @@ class ServermapUpdater: n, prefix, offsets) = verinfo + offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) - # XXX: This should be done for us in the method, so - # presumably you can go in there and fix it. verinfo = (seqnum, root_hash, saltish, @@ -908,7 +889,15 @@ class ServermapUpdater: n, prefix, offsets_tuple) + return verinfo + def _got_update_results_one_share(self, results, share): + """ + I record the update results in results. + """ + assert len(results) == 4 + verinfo, blockhashes, start, end = results + verinfo = self._make_verinfo_hashable(verinfo) update_data = (blockhashes, start, end) self._servermap.set_update_data_for_share_and_verinfo(share, verinfo, diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 08ef4277..67469842 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -43,6 +43,10 @@ class LocalWrapper: self.hung_until = None self.post_call_notifier = None self.disconnectors = {} + self.counter_by_methname = {} + + def _clear_counters(self): + self.counter_by_methname = {} def callRemoteOnly(self, methname, *args, **kwargs): d = self.callRemote(methname, *args, **kwargs) @@ -62,6 +66,8 @@ class LocalWrapper: kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs]) def _really_call(): + def incr(d, k): d[k] = d.setdefault(k, 0) + 1 + incr(self.counter_by_methname, methname) meth = getattr(self.original, "remote_" + methname) return meth(*args, **kwargs) diff --git a/src/allmydata/test/test_dirnode.py b/src/allmydata/test/test_dirnode.py index 187f2536..c65114fb 100644 --- a/src/allmydata/test/test_dirnode.py +++ b/src/allmydata/test/test_dirnode.py @@ -1097,6 +1097,23 @@ class Dirnode(GridTestMixin, unittest.TestCase, d.addCallback(_check_results) return d + def test_deepcheck_cachemisses(self): + self.basedir = "dirnode/Dirnode/test_mdmf_cachemisses" + self.set_up_grid() + d = self._test_deepcheck_create() + # Clear the counters and set the rootnode + d.addCallback(lambda rootnode: + not [ss._clear_counters() for ss + in self.g.wrappers_by_id.values()] or rootnode) + d.addCallback(lambda rootnode: rootnode.start_deep_check().when_done()) + def _check(ign): + count = sum([ss.counter_by_methname['slot_readv'] + for ss in self.g.wrappers_by_id.values()]) + self.failIf(count > 60, 'Expected only 60 cache misses,' + 'unfortunately there were %d' % (count,)) + d.addCallback(_check) + return d + def test_deepcheck_mdmf(self): self.basedir = "dirnode/Dirnode/test_deepcheck_mdmf" self.set_up_grid() diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 9dc1b96f..ee5b768a 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -21,7 +21,7 @@ from allmydata.storage.common import storage_index_to_dir from allmydata.scripts import debug from allmydata.mutable.filenode import MutableFileNode, BackoffAgent -from allmydata.mutable.common import ResponseCache, \ +from allmydata.mutable.common import \ MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \ NotEnoughServersError, CorruptShareError @@ -639,25 +639,6 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin): d.addCallback(_created) return d - - def test_response_cache_memory_leak(self): - d = self.nodemaker.create_mutable_file("contents") - def _created(n): - d = n.download_best_version() - d.addCallback(lambda res: self.failUnlessEqual(res, "contents")) - d.addCallback(lambda ign: self.failUnless(isinstance(n._cache, ResponseCache))) - - def _check_cache(expected): - # The total size of cache entries should not increase on the second download; - # in fact the cache contents should be identical. - d2 = n.download_best_version() - d2.addCallback(lambda rep: self.failUnlessEqual(repr(n._cache.cache), expected)) - return d2 - d.addCallback(lambda ign: _check_cache(repr(n._cache.cache))) - return d - d.addCallback(_created) - return d - def test_create_with_initial_contents_function(self): data = "initial contents" def _make_contents(n): @@ -1528,15 +1509,6 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin): corrupt_early=False, failure_checker=_check) - def test_corrupt_all_block_hash_tree_late(self): - def _check(res): - f = res[0] - self.failUnless(f.check(NotEnoughSharesError)) - return self._test_corrupt_all("block_hash_tree", - "block hash tree failure", - corrupt_early=False, - failure_checker=_check) - def test_corrupt_all_block_late(self): def _check(res): @@ -1618,17 +1590,20 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin): d.addCallback(lambda ignored: self._test_corrupt_all(("block_hash_tree", 12 * 32), "block hash tree failure", - corrupt_early=False, + corrupt_early=True, should_succeed=False)) return d def test_corrupt_mdmf_block_hash_tree_late(self): + # Note - there is no SDMF counterpart to this test, as the SDMF + # files are guaranteed to have exactly one block, and therefore + # the block hash tree fits within the initial read (#1240). d = self.publish_mdmf() d.addCallback(lambda ignored: self._test_corrupt_all(("block_hash_tree", 12 * 32), "block hash tree failure", - corrupt_early=True, + corrupt_early=False, should_succeed=False)) return d @@ -2233,9 +2208,9 @@ class MultipleEncodings(unittest.TestCase): # then mix up the shares, to make sure that download survives seeing # a variety of encodings. This is actually kind of tricky to set up. - contents1 = "Contents for encoding 1 (3-of-10) go here" - contents2 = "Contents for encoding 2 (4-of-9) go here" - contents3 = "Contents for encoding 3 (4-of-7) go here" + contents1 = "Contents for encoding 1 (3-of-10) go here"*1000 + contents2 = "Contents for encoding 2 (4-of-9) go here"*1000 + contents3 = "Contents for encoding 3 (4-of-7) go here"*1000 # we make a retrieval object that doesn't know what encoding # parameters to use @@ -2403,39 +2378,6 @@ class MultipleVersions(unittest.TestCase, PublishMixin, CheckerMixin): return d -class Utils(unittest.TestCase): - def test_cache(self): - c = ResponseCache() - # xdata = base62.b2a(os.urandom(100))[:100] - xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l" - ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs" - c.add("v1", 1, 0, xdata) - c.add("v1", 1, 2000, ydata) - self.failUnlessEqual(c.read("v2", 1, 10, 11), None) - self.failUnlessEqual(c.read("v1", 2, 10, 11), None) - self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10]) - self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:]) - self.failUnlessEqual(c.read("v1", 1, 300, 10), None) - self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55]) - self.failUnlessEqual(c.read("v1", 1, 0, 101), None) - self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100]) - self.failUnlessEqual(c.read("v1", 1, 100, 1), None) - self.failUnlessEqual(c.read("v1", 1, 1990, 9), None) - self.failUnlessEqual(c.read("v1", 1, 1990, 10), None) - self.failUnlessEqual(c.read("v1", 1, 1990, 11), None) - self.failUnlessEqual(c.read("v1", 1, 1990, 15), None) - self.failUnlessEqual(c.read("v1", 1, 1990, 19), None) - self.failUnlessEqual(c.read("v1", 1, 1990, 20), None) - self.failUnlessEqual(c.read("v1", 1, 1990, 21), None) - self.failUnlessEqual(c.read("v1", 1, 1990, 25), None) - self.failUnlessEqual(c.read("v1", 1, 1999, 25), None) - - # test joining fragments - c = ResponseCache() - c.add("v1", 1, 0, xdata[:10]) - c.add("v1", 1, 10, xdata[10:20]) - self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20]) - class Exceptions(unittest.TestCase): def test_repr(self): nmde = NeedMoreDataError(100, 50, 100) @@ -2443,6 +2385,7 @@ class Exceptions(unittest.TestCase): ucwe = UncoordinatedWriteError() self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe)) + class SameKeyGenerator: def __init__(self, pubkey, privkey): self.pubkey = pubkey @@ -2514,7 +2457,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): self.basedir = "mutable/Problems/test_retrieve_surprise" self.set_up_grid() nm = self.g.clients[0].nodemaker - d = nm.create_mutable_file(MutableData("contents 1")) + d = nm.create_mutable_file(MutableData("contents 1"*4000)) def _created(n): d = defer.succeed(None) d.addCallback(lambda res: n.get_servermap(MODE_READ)) @@ -2528,7 +2471,6 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): # now attempt to retrieve the old version with the old servermap. # This will look like someone has changed the file since we # updated the servermap. - d.addCallback(lambda res: n._cache._clear()) d.addCallback(lambda res: log.msg("starting doomed read")) d.addCallback(lambda res: self.shouldFail(NotEnoughSharesError,