]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Cloud backend: change the ChunkCache's replacement policy to LRU. fixes #1885
authorDaira Hopwood <david-sarah@jacaranda.org>
Tue, 28 May 2013 21:30:08 +0000 (22:30 +0100)
committerDaira Hopwood <daira@jacaranda.org>
Wed, 9 Apr 2014 00:57:51 +0000 (01:57 +0100)
Signed-off-by: Daira Hopwood <david-sarah@jacaranda.org>
src/allmydata/storage/backends/cloud/cloud_common.py
src/allmydata/storage/backends/cloud/immutable.py
src/allmydata/storage/backends/cloud/mutable.py

index fda61e666379b57f25c02d76789b0f07424e5798..90fec844e64d090fd8b58d5d48101f20b9c9844f 100644 (file)
@@ -38,7 +38,8 @@ def get_chunk_key(share_key, chunknum):
 
 DEFAULT_PREFERRED_CHUNK_SIZE = 512*1024
 PREFERRED_CHUNK_SIZE = DEFAULT_PREFERRED_CHUNK_SIZE
-PIPELINE_DEPTH = 4
+PIPELINE_DEPTH = 5
+CACHED_CHUNKS = 5
 
 ZERO_CHUNKDATA = "\x00"*PREFERRED_CHUNK_SIZE
 
@@ -574,38 +575,34 @@ class BackpressurePipeline(object):
 class ChunkCache(object):
     """I cache chunks for a specific share object."""
 
-    def __init__(self, container, key, chunksize, nchunks=1, initial_cachemap={}):
+    def __init__(self, container, key, chunksize, cached_chunks=CACHED_CHUNKS, initial_cachemap={}):
         self._container = container
         self._key = key
         self._chunksize = chunksize
-        self._nchunks = nchunks
+        self._cached_chunks = cached_chunks
 
         # chunknum -> deferred data
         self._cachemap = initial_cachemap
+        self._lru = deque(sorted(initial_cachemap.keys()))
         self._pipeline = BackpressurePipeline(PIPELINE_DEPTH)
 
-    def set_nchunks(self, nchunks):
-        self._nchunks = nchunks
-
     def _load_chunk(self, chunknum, chunkdata_d):
         d = self._container.get_object(get_chunk_key(self._key, chunknum))
         eventual_chain(source=d, target=chunkdata_d)
         return d
 
+    def _discard(self):
+        while len(self._lru) > self._cached_chunks:
+            self.flush_chunk(self._lru.popleft())
+
     def get(self, chunknum, result_d):
         if chunknum in self._cachemap:
             # cache hit; never stall
+            self._lru.remove(chunknum)  # takes O(cached_chunks) time, but that's fine
+            self._lru.append(chunknum)
             eventual_chain(source=self._cachemap[chunknum], target=result_d)
             return defer.succeed(None)
 
-        # Evict any chunks other than the first and last two, until there are
-        # three or fewer chunks left cached.
-        for candidate_chunknum in self._cachemap.keys():
-            if len(self._cachemap) <= 3:
-                break
-            if candidate_chunknum not in (0, self._nchunks-2, self._nchunks-1):
-                self.flush_chunk(candidate_chunknum)
-
         # cache miss; stall when the pipeline is full
         chunkdata_d = defer.Deferred()
         d = self._pipeline.add(1, self._load_chunk, chunknum, chunkdata_d)
@@ -614,6 +611,8 @@ class ChunkCache(object):
             return res
         chunkdata_d.addCallback(_check)
         self._cachemap[chunknum] = chunkdata_d
+        self._lru.append(chunknum)
+        self._discard()
         eventual_chain(source=chunkdata_d, target=result_d)
         return d
 
index 41f0dbb33d09ce4e4afe79bf398eb5fec896908f..8091da28d3949386462b92e08148d564c175e5ef 100644 (file)
@@ -9,7 +9,6 @@ from zope.interface import implements
 from allmydata.interfaces import IShareForReading, IShareForWriting
 
 from allmydata.util.assertutil import precondition, _assert
-from allmydata.util.mathutil import div_ceil
 from allmydata.storage.common import CorruptStoredShareError, UnknownImmutableContainerVersionError, \
      DataTooLargeError
 from allmydata.storage.backends.cloud import cloud_common
@@ -162,9 +161,8 @@ class ImmutableCloudShareForReading(CloudShareBase, ImmutableCloudShareMixin, Cl
 
         self._total_size = total_size
         self._chunksize = chunksize
-        nchunks = div_ceil(total_size, chunksize)
         initial_cachemap = {0: defer.succeed(first_chunkdata)}
-        self._cache = ChunkCache(container, self._key, chunksize, nchunks, initial_cachemap)
+        self._cache = ChunkCache(container, self._key, chunksize, initial_cachemap=initial_cachemap)
         #print "ImmutableCloudShareForReading", total_size, chunksize, self._key
 
         header = first_chunkdata[:self.HEADER_SIZE]
index 2314f52fe16f09297945e5fe767b5e0d08dc994e..7c9a2ead86c14cc66d9739f9398140e7dacc568e 100644 (file)
@@ -123,7 +123,6 @@ class MutableCloudShare(CloudShareBase, CloudShareReaderMixin):
     def _set_total_size(self, total_size):
         self._total_size = total_size
         self._nchunks = div_ceil(self._total_size, self._chunksize)
-        self._cache.set_nchunks(self._nchunks)
 
     def log(self, *args, **kwargs):
         if self.parent: