X-Git-Url: https://git.rkrishnan.org/?a=blobdiff_plain;f=src%2Fallmydata%2Fmutable%2Fservermap.py;h=4ef85c583f1f60d86ba6610b405346cb49e6bd26;hb=4563ba456b1b2d640551956374b0cfbfc602dcec;hp=130250b29c81407abe22e8048666c6fdb0ad79f4;hpb=861892983369c0e96dc1e73420c1d9609724d752;p=tahoe-lafs%2Ftahoe-lafs.git diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 130250b2..4ef85c58 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -119,6 +119,7 @@ class ServerMap: self._bad_shares = {} # maps (server,shnum) to old checkstring self._last_update_mode = None self._last_update_time = 0 + self.proxies = {} self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..] # where blockhashes is a list of bytestrings (the result of # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both @@ -631,19 +632,6 @@ class ServermapUpdater: self._servermap.add_problem(f) - def _cache_good_sharedata(self, verinfo, shnum, now, data): - """ - If one of my queries returns successfully (which means that we - were able to and successfully did validate the signature), I - cache the data that we initially fetched from the storage - server. This will help reduce the number of roundtrips that need - to occur when the file is downloaded, or when the file is - updated. - """ - if verinfo: - self._node._add_to_cache(verinfo, shnum, 0, data) - - def _got_results(self, datavs, server, readsize, storage_index, started): lp = self.log(format="got result from [%(name)s], %(numshares)d shares", name=server.get_name(), @@ -675,7 +663,9 @@ class ServermapUpdater: reader = MDMFSlotReadProxy(ss, storage_index, shnum, - data) + data, + data_is_everything=(len(data) < readsize)) + # our goal, with each response, is to validate the version # information and share data as best we can at this point -- # we do this by validating the signature. To do this, we @@ -747,13 +737,21 @@ class ServermapUpdater: d5 = defer.succeed(None) dl = defer.DeferredList([d, d2, d3, d4, d5]) + def _append_proxy(passthrough, shnum=shnum, reader=reader): + # Store the proxy (with its cache) keyed by serverid and + # version. + _, (_,verinfo), _, _, _ = passthrough + verinfo = self._make_verinfo_hashable(verinfo) + self._servermap.proxies[(verinfo, + server.get_serverid(), + storage_index, shnum)] = reader + return passthrough + dl.addCallback(_append_proxy) dl.addBoth(self._turn_barrier) dl.addCallback(lambda results, shnum=shnum: self._got_signature_one_share(results, shnum, server, lp)) dl.addErrback(lambda error, shnum=shnum, data=data: self._got_corrupt_share(error, shnum, server, data, lp)) - dl.addCallback(lambda verinfo, shnum=shnum, data=data: - self._cache_good_sharedata(verinfo, shnum, now, data)) ds.append(dl) # dl is a deferred list that will fire when all of the shares # that we found on this server are done processing. When dl fires, @@ -817,6 +815,10 @@ class ServermapUpdater: return None _, verinfo, signature, __, ___ = results + verinfo = self._make_verinfo_hashable(verinfo[1]) + + # This tuple uniquely identifies a share on the grid; we use it + # to keep track of the ones that we've already seen. (seqnum, root_hash, saltish, @@ -825,22 +827,8 @@ class ServermapUpdater: k, n, prefix, - offsets) = verinfo[1] - offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) + offsets_tuple) = verinfo - # XXX: This should be done for us in the method, so - # presumably you can go in there and fix it. - verinfo = (seqnum, - root_hash, - saltish, - segsize, - datalen, - k, - n, - prefix, - offsets_tuple) - # This tuple uniquely identifies a share on the grid; we use it - # to keep track of the ones that we've already seen. if verinfo not in self._valid_versions: # This is a new version tuple, and we need to validate it @@ -879,13 +867,7 @@ class ServermapUpdater: return verinfo - - def _got_update_results_one_share(self, results, share): - """ - I record the update results in results. - """ - assert len(results) == 4 - verinfo, blockhashes, start, end = results + def _make_verinfo_hashable(self, verinfo): (seqnum, root_hash, saltish, @@ -895,10 +877,9 @@ class ServermapUpdater: n, prefix, offsets) = verinfo + offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) - # XXX: This should be done for us in the method, so - # presumably you can go in there and fix it. verinfo = (seqnum, root_hash, saltish, @@ -908,7 +889,15 @@ class ServermapUpdater: n, prefix, offsets_tuple) + return verinfo + def _got_update_results_one_share(self, results, share): + """ + I record the update results in results. + """ + assert len(results) == 4 + verinfo, blockhashes, start, end = results + verinfo = self._make_verinfo_hashable(verinfo) update_data = (blockhashes, start, end) self._servermap.set_update_data_for_share_and_verinfo(share, verinfo,