from allmydata.util import idlib
-from allmydata.util.dictutil import DictOfSets
+from allmydata.util.spans import DataSpans
MODE_CHECK = "MODE_CHECK" # query all peers
MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version
class ResponseCache:
"""I cache share data, to reduce the number of round trips used during
mutable file operations. All of the data in my cache is for a single
- storage index, but I will keep information on multiple shares (and
- multiple versions) for that storage index.
+ storage index, but I will keep information on multiple shares for
+ that storage index.
+
+ I maintain a highest-seen sequence number, and will flush all entries
+ each time this number increases (this doesn't necessarily imply that
+ all entries have the same sequence number).
My cache is indexed by a (verinfo, shnum) tuple.
- My cache entries contain a set of non-overlapping byteranges: (start,
- data, timestamp) tuples.
+ My cache entries are DataSpans instances, each representing a set of
+ non-overlapping byteranges.
"""
def __init__(self):
- self.cache = DictOfSets()
+ self.cache = {}
+ self.seqnum = None
def _clear(self):
- # used by unit tests
- self.cache = DictOfSets()
-
- def _does_overlap(self, x_start, x_length, y_start, y_length):
- if x_start < y_start:
- x_start, y_start = y_start, x_start
- x_length, y_length = y_length, x_length
- x_end = x_start + x_length
- y_end = y_start + y_length
- # this just returns a boolean. Eventually we'll want a form that
- # returns a range.
- if not x_length:
- return False
- if not y_length:
- return False
- if x_start >= y_end:
- return False
- if y_start >= x_end:
- return False
- return True
-
-
- def _inside(self, x_start, x_length, y_start, y_length):
- x_end = x_start + x_length
- y_end = y_start + y_length
- if x_start < y_start:
- return False
- if x_start >= y_end:
- return False
- if x_end < y_start:
- return False
- if x_end > y_end:
- return False
- return True
-
- def add(self, verinfo, shnum, offset, data, timestamp):
+ # also used by unit tests
+ self.cache = {}
+
+ def add(self, verinfo, shnum, offset, data):
+ seqnum = verinfo[0]
+ if seqnum > self.seqnum:
+ self._clear()
+ self.seqnum = seqnum
+
index = (verinfo, shnum)
- self.cache.add(index, (offset, data, timestamp) )
+ if index in self.cache:
+ self.cache[index].add(offset, data)
+ else:
+ spans = DataSpans()
+ spans.add(offset, data)
+ self.cache[index] = spans
def read(self, verinfo, shnum, offset, length):
"""Try to satisfy a read request from cache.
- Returns (data, timestamp), or (None, None) if the cache did not hold
- the requested data.
+ Returns data, or None if the cache did not hold the entire requested span.
"""
- # TODO: join multiple fragments, instead of only returning a hit if
- # we have a fragment that contains the whole request
+ # TODO: perhaps return a DataSpans object representing the fragments
+ # that we have, instead of only returning a hit if we can satisfy the
+ # whole request from cache.
index = (verinfo, shnum)
- for entry in self.cache.get(index, set()):
- (e_start, e_data, e_timestamp) = entry
- if self._inside(offset, length, e_start, len(e_data)):
- want_start = offset - e_start
- want_end = offset+length - e_start
- return (e_data[want_start:want_end], e_timestamp)
- return None, None
-
-
+ if index in self.cache:
+ return self.cache[index].get(offset, length)
+ else:
+ return None
d.addCallback(lambda res: self.failUnlessEqual(res, "contents"))
d.addCallback(lambda ign: self.failUnless(isinstance(n._cache, ResponseCache)))
- def _check_cache_size(expected):
- # The total size of cache entries should not increase on the second download.
+ def _check_cache(expected):
+ # The total size of cache entries should not increase on the second download;
+ # in fact the cache contents should be identical.
d2 = n.download_best_version()
- d2.addCallback(lambda ign: self.failUnlessEqual(len(repr(n._cache.cache)), expected))
+ d2.addCallback(lambda rep: self.failUnlessEqual(repr(n._cache.cache), expected))
return d2
- d.addCallback(lambda ign: _check_cache_size(len(repr(n._cache.cache))))
+ d.addCallback(lambda ign: _check_cache(repr(n._cache.cache)))
return d
d.addCallback(_created)
return d
- test_response_cache_memory_leak.todo = "This isn't fixed (see #1045)."
def test_create_with_initial_contents_function(self):
data = "initial contents"
class Utils(unittest.TestCase):
- def _do_inside(self, c, x_start, x_length, y_start, y_length):
- # we compare this against sets of integers
- x = set(range(x_start, x_start+x_length))
- y = set(range(y_start, y_start+y_length))
- should_be_inside = x.issubset(y)
- self.failUnlessEqual(should_be_inside, c._inside(x_start, x_length,
- y_start, y_length),
- str((x_start, x_length, y_start, y_length)))
-
- def test_cache_inside(self):
- c = ResponseCache()
- x_start = 10
- x_length = 5
- for y_start in range(8, 17):
- for y_length in range(8):
- self._do_inside(c, x_start, x_length, y_start, y_length)
-
- def _do_overlap(self, c, x_start, x_length, y_start, y_length):
- # we compare this against sets of integers
- x = set(range(x_start, x_start+x_length))
- y = set(range(y_start, y_start+y_length))
- overlap = bool(x.intersection(y))
- self.failUnlessEqual(overlap, c._does_overlap(x_start, x_length,
- y_start, y_length),
- str((x_start, x_length, y_start, y_length)))
-
- def test_cache_overlap(self):
- c = ResponseCache()
- x_start = 10
- x_length = 5
- for y_start in range(8, 17):
- for y_length in range(8):
- self._do_overlap(c, x_start, x_length, y_start, y_length)
-
def test_cache(self):
c = ResponseCache()
# xdata = base62.b2a(os.urandom(100))[:100]
xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
- nope = (None, None)
- c.add("v1", 1, 0, xdata, "time0")
- c.add("v1", 1, 2000, ydata, "time1")
- self.failUnlessEqual(c.read("v2", 1, 10, 11), nope)
- self.failUnlessEqual(c.read("v1", 2, 10, 11), nope)
- self.failUnlessEqual(c.read("v1", 1, 0, 10), (xdata[:10], "time0"))
- self.failUnlessEqual(c.read("v1", 1, 90, 10), (xdata[90:], "time0"))
- self.failUnlessEqual(c.read("v1", 1, 300, 10), nope)
- self.failUnlessEqual(c.read("v1", 1, 2050, 5), (ydata[50:55], "time1"))
- self.failUnlessEqual(c.read("v1", 1, 0, 101), nope)
- self.failUnlessEqual(c.read("v1", 1, 99, 1), (xdata[99:100], "time0"))
- self.failUnlessEqual(c.read("v1", 1, 100, 1), nope)
- self.failUnlessEqual(c.read("v1", 1, 1990, 9), nope)
- self.failUnlessEqual(c.read("v1", 1, 1990, 10), nope)
- self.failUnlessEqual(c.read("v1", 1, 1990, 11), nope)
- self.failUnlessEqual(c.read("v1", 1, 1990, 15), nope)
- self.failUnlessEqual(c.read("v1", 1, 1990, 19), nope)
- self.failUnlessEqual(c.read("v1", 1, 1990, 20), nope)
- self.failUnlessEqual(c.read("v1", 1, 1990, 21), nope)
- self.failUnlessEqual(c.read("v1", 1, 1990, 25), nope)
- self.failUnlessEqual(c.read("v1", 1, 1999, 25), nope)
-
- # optional: join fragments
+ c.add("v1", 1, 0, xdata)
+ c.add("v1", 1, 2000, ydata)
+ self.failUnlessEqual(c.read("v2", 1, 10, 11), None)
+ self.failUnlessEqual(c.read("v1", 2, 10, 11), None)
+ self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10])
+ self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:])
+ self.failUnlessEqual(c.read("v1", 1, 300, 10), None)
+ self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55])
+ self.failUnlessEqual(c.read("v1", 1, 0, 101), None)
+ self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100])
+ self.failUnlessEqual(c.read("v1", 1, 100, 1), None)
+ self.failUnlessEqual(c.read("v1", 1, 1990, 9), None)
+ self.failUnlessEqual(c.read("v1", 1, 1990, 10), None)
+ self.failUnlessEqual(c.read("v1", 1, 1990, 11), None)
+ self.failUnlessEqual(c.read("v1", 1, 1990, 15), None)
+ self.failUnlessEqual(c.read("v1", 1, 1990, 19), None)
+ self.failUnlessEqual(c.read("v1", 1, 1990, 20), None)
+ self.failUnlessEqual(c.read("v1", 1, 1990, 21), None)
+ self.failUnlessEqual(c.read("v1", 1, 1990, 25), None)
+ self.failUnlessEqual(c.read("v1", 1, 1999, 25), None)
+
+ # test joining fragments
c = ResponseCache()
- c.add("v1", 1, 0, xdata[:10], "time0")
- c.add("v1", 1, 10, xdata[10:20], "time1")
- #self.failUnlessEqual(c.read("v1", 1, 0, 20), (xdata[:20], "time0"))
+ c.add("v1", 1, 0, xdata[:10])
+ c.add("v1", 1, 10, xdata[10:20])
+ self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20])
class Exceptions(unittest.TestCase):
def test_repr(self):