-from allmydata.util.spans import DataSpans
-
MODE_CHECK = "MODE_CHECK" # query all peers
MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version
MODE_WRITE = "MODE_WRITE" # replace all shares, probably.. not for initial
class UnknownVersionError(BadShareError):
"""The share we received was of a version we don't recognize."""
-
-class ResponseCache:
- """I cache share data, to reduce the number of round trips used during
- mutable file operations. All of the data in my cache is for a single
- storage index, but I will keep information on multiple shares for
- that storage index.
-
- I maintain a highest-seen sequence number, and will flush all entries
- each time this number increases (this doesn't necessarily imply that
- all entries have the same sequence number).
-
- My cache is indexed by a (verinfo, shnum) tuple.
-
- My cache entries are DataSpans instances, each representing a set of
- non-overlapping byteranges.
- """
-
- def __init__(self):
- self.cache = {}
- self.seqnum = None
-
- def _clear(self):
- # also used by unit tests
- self.cache = {}
-
- def add(self, verinfo, shnum, offset, data):
- seqnum = verinfo[0]
- if seqnum > self.seqnum:
- self._clear()
- self.seqnum = seqnum
-
- index = (verinfo, shnum)
- if index in self.cache:
- self.cache[index].add(offset, data)
- else:
- spans = DataSpans()
- spans.add(offset, data)
- self.cache[index] = spans
-
- def read(self, verinfo, shnum, offset, length):
- """Try to satisfy a read request from cache.
- Returns data, or None if the cache did not hold the entire requested span.
- """
-
- # TODO: perhaps return a DataSpans object representing the fragments
- # that we have, instead of only returning a hit if we can satisfy the
- # whole request from cache.
-
- index = (verinfo, shnum)
- if index in self.cache:
- return self.cache[index].get(offset, length)
- else:
- return None
from allmydata.mutable.publish import Publish, MutableData,\
TransformingUploadable
from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
- ResponseCache, UncoordinatedWriteError
+ UncoordinatedWriteError
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
from allmydata.mutable.retrieve import Retrieve
from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
self._required_shares = default_encoding_parameters["k"]
self._total_shares = default_encoding_parameters["n"]
self._sharemap = {} # known shares, shnum-to-[nodeids]
- self._cache = ResponseCache()
self._most_recent_size = None
# filled in after __init__ if we're being created for the first time;
# filled in by the servermap updater before publishing, otherwise.
self._privkey = privkey
def _populate_encprivkey(self, encprivkey):
self._encprivkey = encprivkey
- def _add_to_cache(self, verinfo, shnum, offset, data):
- self._cache.add(verinfo, shnum, offset, data)
- def _read_from_cache(self, verinfo, shnum, offset, length):
- return self._cache.read(verinfo, shnum, offset, length)
def get_write_enabler(self, server):
seed = server.get_foolscap_write_enabler_seed()
rref,
storage_index,
shnum,
- data=""):
+ data="",
+ data_is_everything=False):
# Start the initialization process.
self._rref = rref
self._storage_index = storage_index
# If the user has chosen to initialize us with some data, we'll
# try to satisfy subsequent data requests with that data before
- # asking the storage server for it. If
+ # asking the storage server for it.
self._data = data
+
+ # If the provided data is known to be complete, then we know there's
+ # nothing to be gained by querying the server, so we should just
+ # partially satisfy requests with what we have.
+ self._data_is_everything = data_is_everything
+
# The way callers interact with cache in the filenode returns
# None if there isn't any cached data, but the way we index the
# cached data requires a string, so convert None to "".
# TODO: It's entirely possible to tweak this so that it just
# fulfills the requests that it can, and not demand that all
# requests are satisfiable before running it.
- if not unsatisfiable and not force_remote:
+
+ if not unsatisfiable or self._data_is_everything:
results = [self._data[offset:offset+length]
for (offset, length) in readvs]
results = {self.shnum: results}
self.remaining_sharemap = DictOfSets()
for (shnum, server, timestamp) in shares:
self.remaining_sharemap.add(shnum, server)
- # If the servermap update fetched anything, it fetched at least 1
- # KiB, so we ask for that much.
- # TODO: Change the cache methods to allow us to fetch all of the
- # data that they have, then change this method to do that.
- any_cache = self._node._read_from_cache(self.verinfo, shnum,
- 0, 1000)
- reader = MDMFSlotReadProxy(server.get_rref(),
- self._storage_index,
- shnum,
- any_cache)
+ # Reuse the SlotReader from the servermap.
+ key = (self.verinfo, server.get_serverid(),
+ self._storage_index, shnum)
+ if key in self.servermap.proxies:
+ reader = self.servermap.proxies[key]
+ else:
+ reader = MDMFSlotReadProxy(server.get_rref(),
+ self._storage_index, shnum, None)
reader.server = server
self.readers[shnum] = reader
assert len(self.remaining_sharemap) >= k
block_and_salt, blockhashes, sharehashes = results
block, salt = block_and_salt
+ assert type(block) is str, (block, salt)
blockhashes = dict(enumerate(blockhashes))
self.log("the reader gave me the following blockhashes: %s" % \
#needed.discard(0)
self.log("getting blockhashes for segment %d, share %d: %s" % \
(segnum, reader.shnum, str(needed)))
- d1 = reader.get_blockhashes(needed, force_remote=True)
+ # TODO is force_remote necessary here?
+ d1 = reader.get_blockhashes(needed, force_remote=False)
if self.share_hash_tree.needed_hashes(reader.shnum):
need = self.share_hash_tree.needed_hashes(reader.shnum)
self.log("also need sharehashes for share %d: %s" % (reader.shnum,
str(need)))
- d2 = reader.get_sharehashes(need, force_remote=True)
+ d2 = reader.get_sharehashes(need, force_remote=False)
else:
d2 = defer.succeed({}) # the logic in the next method
# expects a dict
self._bad_shares = {} # maps (server,shnum) to old checkstring
self._last_update_mode = None
self._last_update_time = 0
+ self.proxies = {}
self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
# where blockhashes is a list of bytestrings (the result of
# layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
self._servermap.add_problem(f)
- def _cache_good_sharedata(self, verinfo, shnum, now, data):
- """
- If one of my queries returns successfully (which means that we
- were able to and successfully did validate the signature), I
- cache the data that we initially fetched from the storage
- server. This will help reduce the number of roundtrips that need
- to occur when the file is downloaded, or when the file is
- updated.
- """
- if verinfo:
- self._node._add_to_cache(verinfo, shnum, 0, data)
-
-
def _got_results(self, datavs, server, readsize, storage_index, started):
lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
name=server.get_name(),
reader = MDMFSlotReadProxy(ss,
storage_index,
shnum,
- data)
+ data,
+ data_is_everything=(len(data) < readsize))
+
# our goal, with each response, is to validate the version
# information and share data as best we can at this point --
# we do this by validating the signature. To do this, we
d5 = defer.succeed(None)
dl = defer.DeferredList([d, d2, d3, d4, d5])
+ def _append_proxy(passthrough, shnum=shnum, reader=reader):
+ # Store the proxy (with its cache) keyed by serverid and
+ # version.
+ _, (_,verinfo), _, _, _ = passthrough
+ verinfo = self._make_verinfo_hashable(verinfo)
+ self._servermap.proxies[(verinfo,
+ server.get_serverid(),
+ storage_index, shnum)] = reader
+ return passthrough
+ dl.addCallback(_append_proxy)
dl.addBoth(self._turn_barrier)
dl.addCallback(lambda results, shnum=shnum:
self._got_signature_one_share(results, shnum, server, lp))
dl.addErrback(lambda error, shnum=shnum, data=data:
self._got_corrupt_share(error, shnum, server, data, lp))
- dl.addCallback(lambda verinfo, shnum=shnum, data=data:
- self._cache_good_sharedata(verinfo, shnum, now, data))
ds.append(dl)
# dl is a deferred list that will fire when all of the shares
# that we found on this server are done processing. When dl fires,
return None
_, verinfo, signature, __, ___ = results
+ verinfo = self._make_verinfo_hashable(verinfo[1])
+
+ # This tuple uniquely identifies a share on the grid; we use it
+ # to keep track of the ones that we've already seen.
(seqnum,
root_hash,
saltish,
k,
n,
prefix,
- offsets) = verinfo[1]
- offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
+ offsets_tuple) = verinfo
- # XXX: This should be done for us in the method, so
- # presumably you can go in there and fix it.
- verinfo = (seqnum,
- root_hash,
- saltish,
- segsize,
- datalen,
- k,
- n,
- prefix,
- offsets_tuple)
- # This tuple uniquely identifies a share on the grid; we use it
- # to keep track of the ones that we've already seen.
if verinfo not in self._valid_versions:
# This is a new version tuple, and we need to validate it
return verinfo
-
- def _got_update_results_one_share(self, results, share):
- """
- I record the update results in results.
- """
- assert len(results) == 4
- verinfo, blockhashes, start, end = results
+ def _make_verinfo_hashable(self, verinfo):
(seqnum,
root_hash,
saltish,
n,
prefix,
offsets) = verinfo
+
offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
- # XXX: This should be done for us in the method, so
- # presumably you can go in there and fix it.
verinfo = (seqnum,
root_hash,
saltish,
n,
prefix,
offsets_tuple)
+ return verinfo
+ def _got_update_results_one_share(self, results, share):
+ """
+ I record the update results in results.
+ """
+ assert len(results) == 4
+ verinfo, blockhashes, start, end = results
+ verinfo = self._make_verinfo_hashable(verinfo)
update_data = (blockhashes, start, end)
self._servermap.set_update_data_for_share_and_verinfo(share,
verinfo,
self.hung_until = None
self.post_call_notifier = None
self.disconnectors = {}
+ self.counter_by_methname = {}
+
+ def _clear_counters(self):
+ self.counter_by_methname = {}
def callRemoteOnly(self, methname, *args, **kwargs):
d = self.callRemote(methname, *args, **kwargs)
kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
def _really_call():
+ def incr(d, k): d[k] = d.setdefault(k, 0) + 1
+ incr(self.counter_by_methname, methname)
meth = getattr(self.original, "remote_" + methname)
return meth(*args, **kwargs)
d.addCallback(_check_results)
return d
+ def test_deepcheck_cachemisses(self):
+ self.basedir = "dirnode/Dirnode/test_mdmf_cachemisses"
+ self.set_up_grid()
+ d = self._test_deepcheck_create()
+ # Clear the counters and set the rootnode
+ d.addCallback(lambda rootnode:
+ not [ss._clear_counters() for ss
+ in self.g.wrappers_by_id.values()] or rootnode)
+ d.addCallback(lambda rootnode: rootnode.start_deep_check().when_done())
+ def _check(ign):
+ count = sum([ss.counter_by_methname['slot_readv']
+ for ss in self.g.wrappers_by_id.values()])
+ self.failIf(count > 60, 'Expected only 60 cache misses,'
+ 'unfortunately there were %d' % (count,))
+ d.addCallback(_check)
+ return d
+
def test_deepcheck_mdmf(self):
self.basedir = "dirnode/Dirnode/test_deepcheck_mdmf"
self.set_up_grid()
from allmydata.scripts import debug
from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
-from allmydata.mutable.common import ResponseCache, \
+from allmydata.mutable.common import \
MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
NotEnoughServersError, CorruptShareError
d.addCallback(_created)
return d
-
- def test_response_cache_memory_leak(self):
- d = self.nodemaker.create_mutable_file("contents")
- def _created(n):
- d = n.download_best_version()
- d.addCallback(lambda res: self.failUnlessEqual(res, "contents"))
- d.addCallback(lambda ign: self.failUnless(isinstance(n._cache, ResponseCache)))
-
- def _check_cache(expected):
- # The total size of cache entries should not increase on the second download;
- # in fact the cache contents should be identical.
- d2 = n.download_best_version()
- d2.addCallback(lambda rep: self.failUnlessEqual(repr(n._cache.cache), expected))
- return d2
- d.addCallback(lambda ign: _check_cache(repr(n._cache.cache)))
- return d
- d.addCallback(_created)
- return d
-
def test_create_with_initial_contents_function(self):
data = "initial contents"
def _make_contents(n):
corrupt_early=False,
failure_checker=_check)
- def test_corrupt_all_block_hash_tree_late(self):
- def _check(res):
- f = res[0]
- self.failUnless(f.check(NotEnoughSharesError))
- return self._test_corrupt_all("block_hash_tree",
- "block hash tree failure",
- corrupt_early=False,
- failure_checker=_check)
-
def test_corrupt_all_block_late(self):
def _check(res):
d.addCallback(lambda ignored:
self._test_corrupt_all(("block_hash_tree", 12 * 32),
"block hash tree failure",
- corrupt_early=False,
+ corrupt_early=True,
should_succeed=False))
return d
def test_corrupt_mdmf_block_hash_tree_late(self):
+ # Note - there is no SDMF counterpart to this test, as the SDMF
+ # files are guaranteed to have exactly one block, and therefore
+ # the block hash tree fits within the initial read (#1240).
d = self.publish_mdmf()
d.addCallback(lambda ignored:
self._test_corrupt_all(("block_hash_tree", 12 * 32),
"block hash tree failure",
- corrupt_early=True,
+ corrupt_early=False,
should_succeed=False))
return d
# then mix up the shares, to make sure that download survives seeing
# a variety of encodings. This is actually kind of tricky to set up.
- contents1 = "Contents for encoding 1 (3-of-10) go here"
- contents2 = "Contents for encoding 2 (4-of-9) go here"
- contents3 = "Contents for encoding 3 (4-of-7) go here"
+ contents1 = "Contents for encoding 1 (3-of-10) go here"*1000
+ contents2 = "Contents for encoding 2 (4-of-9) go here"*1000
+ contents3 = "Contents for encoding 3 (4-of-7) go here"*1000
# we make a retrieval object that doesn't know what encoding
# parameters to use
return d
-class Utils(unittest.TestCase):
- def test_cache(self):
- c = ResponseCache()
- # xdata = base62.b2a(os.urandom(100))[:100]
- xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
- ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
- c.add("v1", 1, 0, xdata)
- c.add("v1", 1, 2000, ydata)
- self.failUnlessEqual(c.read("v2", 1, 10, 11), None)
- self.failUnlessEqual(c.read("v1", 2, 10, 11), None)
- self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10])
- self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:])
- self.failUnlessEqual(c.read("v1", 1, 300, 10), None)
- self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55])
- self.failUnlessEqual(c.read("v1", 1, 0, 101), None)
- self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100])
- self.failUnlessEqual(c.read("v1", 1, 100, 1), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 9), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 10), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 11), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 15), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 19), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 20), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 21), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 25), None)
- self.failUnlessEqual(c.read("v1", 1, 1999, 25), None)
-
- # test joining fragments
- c = ResponseCache()
- c.add("v1", 1, 0, xdata[:10])
- c.add("v1", 1, 10, xdata[10:20])
- self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20])
-
class Exceptions(unittest.TestCase):
def test_repr(self):
nmde = NeedMoreDataError(100, 50, 100)
ucwe = UncoordinatedWriteError()
self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
+
class SameKeyGenerator:
def __init__(self, pubkey, privkey):
self.pubkey = pubkey
self.basedir = "mutable/Problems/test_retrieve_surprise"
self.set_up_grid()
nm = self.g.clients[0].nodemaker
- d = nm.create_mutable_file(MutableData("contents 1"))
+ d = nm.create_mutable_file(MutableData("contents 1"*4000))
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
# now attempt to retrieve the old version with the old servermap.
# This will look like someone has changed the file since we
# updated the servermap.
- d.addCallback(lambda res: n._cache._clear())
d.addCallback(lambda res: log.msg("starting doomed read"))
d.addCallback(lambda res:
self.shouldFail(NotEnoughSharesError,