]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable read: enable the cache (written during mapupdate, read during retrieve)....
authorBrian Warner <warner@allmydata.com>
Tue, 22 Apr 2008 00:27:50 +0000 (17:27 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 22 Apr 2008 00:27:50 +0000 (17:27 -0700)
src/allmydata/mutable/retrieve.py
src/allmydata/mutable/servermap.py

index 00663f4c15742124df564a76c7e5abad379c06e8..66a049a4841f6074d324c6bd03b927437b799f73 100644 (file)
@@ -4,7 +4,7 @@ from itertools import count
 from zope.interface import implements
 from twisted.internet import defer
 from twisted.python import failure
-from foolscap.eventual import eventually
+from foolscap.eventual import eventually, fireEventually
 from allmydata.interfaces import IRetrieveStatus
 from allmydata.util import hashutil, idlib, log
 from allmydata import hashtree, codec, storage
@@ -179,20 +179,20 @@ class Retrieve:
 
         # ask the cache first
         got_from_cache = False
-        datav = []
-        #for (offset, length) in readv:
-        #    (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
-        #                                               offset, length)
-        #    if data is not None:
-        #        datav.append(data)
-        if len(datav) == len(readv):
+        datavs = []
+        for (offset, length) in readv:
+            (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
+                                                       offset, length)
+            if data is not None:
+                datavs.append(data)
+        if len(datavs) == len(readv):
             self.log("got data from cache")
             got_from_cache = True
-            d = defer.succeed(datav)
+            d = fireEventually({shnum: datavs})
+            # datavs is a dict mapping shnum to a pair of strings
         else:
-            self.remaining_sharemap[shnum].remove(peerid)
             d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
-            d.addCallback(self._fill_cache, readv)
+        self.remaining_sharemap.discard(shnum, peerid)
 
         d.addCallback(self._got_results, m, peerid, started, got_from_cache)
         d.addErrback(self._query_failed, m, peerid)
@@ -212,15 +212,6 @@ class Retrieve:
         d.addErrback(log.err)
         return d # purely for testing convenience
 
-    def _fill_cache(self, datavs, readv):
-        timestamp = time.time()
-        for shnum,datav in datavs.items():
-            for i, (offset, length) in enumerate(readv):
-                data = datav[i]
-                self._node._cache.add(self.verinfo, shnum, offset, data,
-                                      timestamp)
-        return datavs
-
     def _do_read(self, ss, peerid, storage_index, shnums, readv):
         # isolate the callRemote to a separate method, so tests can subclass
         # Publish and override it
@@ -482,7 +473,8 @@ class Retrieve:
         self._status.timings["total"] = time.time() - self._started
         # res is either the new contents, or a Failure
         if isinstance(res, failure.Failure):
-            self.log("Retrieve done, with failure", failure=res)
+            self.log("Retrieve done, with failure", failure=res,
+                     level=log.UNUSUAL)
             self._status.set_status("Failed")
         else:
             self.log("Retrieve done, success!")
index 79a25dd8436b65f5db813442807f69709b66929b..5d90501a15dfbce8270db4cde8708417a3669984 100644 (file)
@@ -511,6 +511,7 @@ class ServermapUpdater:
                 verinfo = self._got_results_one_share(shnum, data, peerid)
                 last_verinfo = verinfo
                 last_shnum = shnum
+                self._node._cache.add(verinfo, shnum, 0, data, now)
             except CorruptShareError, e:
                 # log it and give the other shares a chance to be processed
                 f = failure.Failure()