From: Brian Warner Date: Tue, 22 Apr 2008 00:27:50 +0000 (-0700) Subject: mutable read: enable the cache (written during mapupdate, read during retrieve).... X-Git-Tag: allmydata-tahoe-1.1.0~201 X-Git-Url: https://git.rkrishnan.org/vdrive/$sch_link?a=commitdiff_plain;h=6af124dc3e0645918842cf1430cb0f8ab5a7e057;p=tahoe-lafs%2Ftahoe-lafs.git mutable read: enable the cache (written during mapupdate, read during retrieve). This speeds up small-file reads by about 30% over a link with an average 25ms RTT --- diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 00663f4c..66a049a4 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -4,7 +4,7 @@ from itertools import count from zope.interface import implements from twisted.internet import defer from twisted.python import failure -from foolscap.eventual import eventually +from foolscap.eventual import eventually, fireEventually from allmydata.interfaces import IRetrieveStatus from allmydata.util import hashutil, idlib, log from allmydata import hashtree, codec, storage @@ -179,20 +179,20 @@ class Retrieve: # ask the cache first got_from_cache = False - datav = [] - #for (offset, length) in readv: - # (data, timestamp) = self._node._cache.read(self.verinfo, shnum, - # offset, length) - # if data is not None: - # datav.append(data) - if len(datav) == len(readv): + datavs = [] + for (offset, length) in readv: + (data, timestamp) = self._node._cache.read(self.verinfo, shnum, + offset, length) + if data is not None: + datavs.append(data) + if len(datavs) == len(readv): self.log("got data from cache") got_from_cache = True - d = defer.succeed(datav) + d = fireEventually({shnum: datavs}) + # datavs is a dict mapping shnum to a pair of strings else: - self.remaining_sharemap[shnum].remove(peerid) d = self._do_read(ss, peerid, self._storage_index, [shnum], readv) - d.addCallback(self._fill_cache, readv) + self.remaining_sharemap.discard(shnum, peerid) d.addCallback(self._got_results, m, peerid, started, got_from_cache) d.addErrback(self._query_failed, m, peerid) @@ -212,15 +212,6 @@ class Retrieve: d.addErrback(log.err) return d # purely for testing convenience - def _fill_cache(self, datavs, readv): - timestamp = time.time() - for shnum,datav in datavs.items(): - for i, (offset, length) in enumerate(readv): - data = datav[i] - self._node._cache.add(self.verinfo, shnum, offset, data, - timestamp) - return datavs - def _do_read(self, ss, peerid, storage_index, shnums, readv): # isolate the callRemote to a separate method, so tests can subclass # Publish and override it @@ -482,7 +473,8 @@ class Retrieve: self._status.timings["total"] = time.time() - self._started # res is either the new contents, or a Failure if isinstance(res, failure.Failure): - self.log("Retrieve done, with failure", failure=res) + self.log("Retrieve done, with failure", failure=res, + level=log.UNUSUAL) self._status.set_status("Failed") else: self.log("Retrieve done, success!") diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 79a25dd8..5d90501a 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -511,6 +511,7 @@ class ServermapUpdater: verinfo = self._got_results_one_share(shnum, data, peerid) last_verinfo = verinfo last_shnum = shnum + self._node._cache.add(verinfo, shnum, 0, data, now) except CorruptShareError, e: # log it and give the other shares a chance to be processed f = failure.Failure()