self._bad_shares = {} # maps (server,shnum) to old checkstring
self._last_update_mode = None
self._last_update_time = 0
+ self.proxies = {}
self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
# where blockhashes is a list of bytestrings (the result of
# layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
self._servermap.add_problem(f)
- def _cache_good_sharedata(self, verinfo, shnum, now, data):
- """
- If one of my queries returns successfully (which means that we
- were able to and successfully did validate the signature), I
- cache the data that we initially fetched from the storage
- server. This will help reduce the number of roundtrips that need
- to occur when the file is downloaded, or when the file is
- updated.
- """
- if verinfo:
- self._node._add_to_cache(verinfo, shnum, 0, data)
-
-
def _got_results(self, datavs, server, readsize, storage_index, started):
lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
name=server.get_name(),
reader = MDMFSlotReadProxy(ss,
storage_index,
shnum,
- data)
+ data,
+ data_is_everything=(len(data) < readsize))
+
# our goal, with each response, is to validate the version
# information and share data as best we can at this point --
# we do this by validating the signature. To do this, we
d5 = defer.succeed(None)
dl = defer.DeferredList([d, d2, d3, d4, d5])
+ def _append_proxy(passthrough, shnum=shnum, reader=reader):
+ # Store the proxy (with its cache) keyed by serverid and
+ # version.
+ _, (_,verinfo), _, _, _ = passthrough
+ verinfo = self._make_verinfo_hashable(verinfo)
+ self._servermap.proxies[(verinfo,
+ server.get_serverid(),
+ storage_index, shnum)] = reader
+ return passthrough
+ dl.addCallback(_append_proxy)
dl.addBoth(self._turn_barrier)
dl.addCallback(lambda results, shnum=shnum:
self._got_signature_one_share(results, shnum, server, lp))
dl.addErrback(lambda error, shnum=shnum, data=data:
self._got_corrupt_share(error, shnum, server, data, lp))
- dl.addCallback(lambda verinfo, shnum=shnum, data=data:
- self._cache_good_sharedata(verinfo, shnum, now, data))
ds.append(dl)
# dl is a deferred list that will fire when all of the shares
# that we found on this server are done processing. When dl fires,
return None
_, verinfo, signature, __, ___ = results
+ verinfo = self._make_verinfo_hashable(verinfo[1])
+
+ # This tuple uniquely identifies a share on the grid; we use it
+ # to keep track of the ones that we've already seen.
(seqnum,
root_hash,
saltish,
k,
n,
prefix,
- offsets) = verinfo[1]
- offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
+ offsets_tuple) = verinfo
- # XXX: This should be done for us in the method, so
- # presumably you can go in there and fix it.
- verinfo = (seqnum,
- root_hash,
- saltish,
- segsize,
- datalen,
- k,
- n,
- prefix,
- offsets_tuple)
- # This tuple uniquely identifies a share on the grid; we use it
- # to keep track of the ones that we've already seen.
if verinfo not in self._valid_versions:
# This is a new version tuple, and we need to validate it
return verinfo
-
- def _got_update_results_one_share(self, results, share):
- """
- I record the update results in results.
- """
- assert len(results) == 4
- verinfo, blockhashes, start, end = results
+ def _make_verinfo_hashable(self, verinfo):
(seqnum,
root_hash,
saltish,
n,
prefix,
offsets) = verinfo
+
offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
- # XXX: This should be done for us in the method, so
- # presumably you can go in there and fix it.
verinfo = (seqnum,
root_hash,
saltish,
n,
prefix,
offsets_tuple)
+ return verinfo
+ def _got_update_results_one_share(self, results, share):
+ """
+ I record the update results in results.
+ """
+ assert len(results) == 4
+ verinfo, blockhashes, start, end = results
+ verinfo = self._make_verinfo_hashable(verinfo)
update_data = (blockhashes, start, end)
self._servermap.set_update_data_for_share_and_verinfo(share,
verinfo,