DEFAULT_PREFERRED_CHUNK_SIZE = 512*1024
PREFERRED_CHUNK_SIZE = DEFAULT_PREFERRED_CHUNK_SIZE
-PIPELINE_DEPTH = 4
+PIPELINE_DEPTH = 5
+CACHED_CHUNKS = 5
ZERO_CHUNKDATA = "\x00"*PREFERRED_CHUNK_SIZE
class ChunkCache(object):
"""I cache chunks for a specific share object."""
- def __init__(self, container, key, chunksize, nchunks=1, initial_cachemap={}):
+ def __init__(self, container, key, chunksize, cached_chunks=CACHED_CHUNKS, initial_cachemap={}):
self._container = container
self._key = key
self._chunksize = chunksize
- self._nchunks = nchunks
+ self._cached_chunks = cached_chunks
# chunknum -> deferred data
self._cachemap = initial_cachemap
+ self._lru = deque(sorted(initial_cachemap.keys()))
self._pipeline = BackpressurePipeline(PIPELINE_DEPTH)
- def set_nchunks(self, nchunks):
- self._nchunks = nchunks
-
def _load_chunk(self, chunknum, chunkdata_d):
d = self._container.get_object(get_chunk_key(self._key, chunknum))
eventual_chain(source=d, target=chunkdata_d)
return d
+ def _discard(self):
+ while len(self._lru) > self._cached_chunks:
+ self.flush_chunk(self._lru.popleft())
+
def get(self, chunknum, result_d):
if chunknum in self._cachemap:
# cache hit; never stall
+ self._lru.remove(chunknum) # takes O(cached_chunks) time, but that's fine
+ self._lru.append(chunknum)
eventual_chain(source=self._cachemap[chunknum], target=result_d)
return defer.succeed(None)
- # Evict any chunks other than the first and last two, until there are
- # three or fewer chunks left cached.
- for candidate_chunknum in self._cachemap.keys():
- if len(self._cachemap) <= 3:
- break
- if candidate_chunknum not in (0, self._nchunks-2, self._nchunks-1):
- self.flush_chunk(candidate_chunknum)
-
# cache miss; stall when the pipeline is full
chunkdata_d = defer.Deferred()
d = self._pipeline.add(1, self._load_chunk, chunknum, chunkdata_d)
return res
chunkdata_d.addCallback(_check)
self._cachemap[chunknum] = chunkdata_d
+ self._lru.append(chunknum)
+ self._discard()
eventual_chain(source=chunkdata_d, target=result_d)
return d
from allmydata.interfaces import IShareForReading, IShareForWriting
from allmydata.util.assertutil import precondition, _assert
-from allmydata.util.mathutil import div_ceil
from allmydata.storage.common import CorruptStoredShareError, UnknownImmutableContainerVersionError, \
DataTooLargeError
from allmydata.storage.backends.cloud import cloud_common
self._total_size = total_size
self._chunksize = chunksize
- nchunks = div_ceil(total_size, chunksize)
initial_cachemap = {0: defer.succeed(first_chunkdata)}
- self._cache = ChunkCache(container, self._key, chunksize, nchunks, initial_cachemap)
+ self._cache = ChunkCache(container, self._key, chunksize, initial_cachemap=initial_cachemap)
#print "ImmutableCloudShareForReading", total_size, chunksize, self._key
header = first_chunkdata[:self.HEADER_SIZE]