d.addCallback(_got_storageserver)
d.addCallback(lambda ss: ss.callRemote("slot_readv", storage_index,
[], [(0, readsize)]))
- d.addCallback(self._got_results, peerid, readsize)
- d.addErrback(self._query_failed, peerid, (conn, storage_index,
- peer_storage_servers))
+ d.addCallback(self._got_results, peerid, readsize,
+ (conn, storage_index, peer_storage_servers))
+ d.addErrback(self._query_failed, peerid)
# errors that aren't handled by _query_failed (and errors caused by
# _query_failed) get logged, but we still want to check for doneness.
d.addErrback(log.err)
verifier = rsa.create_verifying_key_from_string(pubkey_s)
return verifier
- def _got_results(self, datavs, peerid, readsize):
+ def _got_results(self, datavs, peerid, readsize, stuff):
self._queries_outstanding.discard(peerid)
self._used_peers.add(peerid)
if not self._running:
for shnum,datav in datavs.items():
data = datav[0]
- self.log("_got_results: got shnum #%d from peerid %s"
- % (shnum, idlib.shortnodeid_b2a(peerid)))
- (seqnum, root_hash, IV, k, N, segsize, datalength,
- pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
-
- if not self._pubkey:
- fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
- if fingerprint != self._node._fingerprint:
- # bad share
- raise CorruptShareError(peerid, shnum,
- "pubkey doesn't match fingerprint")
- self._pubkey = self._deserialize_pubkey(pubkey_s)
- self._node._populate_pubkey(self._pubkey)
-
- verinfo = (seqnum, root_hash, IV, segsize, datalength)
- if verinfo not in self._valid_versions:
- # it's a new pair. Verify the signature.
- valid = self._pubkey.verify(prefix, signature)
- if not valid:
- raise CorruptShareError(peerid, shnum,
- "signature is invalid")
- # ok, it's a valid verinfo. Add it to the list of validated
- # versions.
- self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
- % (seqnum, idlib.b2a(root_hash)[:4],
- idlib.shortnodeid_b2a(peerid), shnum,
- k, N, segsize, datalength))
- self._valid_versions[verinfo] = (prefix, DictOfSets())
-
- # and make a note of the other parameters we've just learned
- if self._required_shares is None:
- self._required_shares = k
- self._node._populate_required_shares(k)
- if self._total_shares is None:
- self._total_shares = N
- self._node._populate_total_shares(N)
-
- # we've already seen this pair, and checked the signature so we
- # know it's a valid candidate. Accumulate the share info, if
- # there's enough data present. If not, raise NeedMoreDataError,
- # which will trigger a re-fetch.
- _ignored = unpack_share(data)
- self.log(" found enough data to add share contents")
- self._valid_versions[verinfo][1].add(shnum, (peerid, data))
-
-
- def _query_failed(self, f, peerid, stuff):
- self._queries_outstanding.discard(peerid)
- self._used_peers.add(peerid)
+ try:
+ self._got_results_one_share(shnum, data, peerid)
+ except NeedMoreDataError, e:
+ # ah, just re-send the query then.
+ self._read_size = max(self._read_size, e.needed_bytes)
+ # TODO: for MDMF, sanity-check self._read_size: don't let one
+ # server cause us to try to read gigabytes of data from all
+ # other servers.
+ (conn, storage_index, peer_storage_servers) = stuff
+ self._do_query(conn, peerid, storage_index, self._read_size,
+ peer_storage_servers)
+ return
+ except CorruptShareError, e:
+ # log it and give the other shares a chance to be processed
+ f = failure.Failure()
+ self.log("WEIRD: bad share: %s %s" % (f, f.value))
+ self._bad_peerids.add(peerid)
+ self._last_failure = f
+ pass
+ # all done!
+
+ def _got_results_one_share(self, shnum, data, peerid):
+ self.log("_got_results: got shnum #%d from peerid %s"
+ % (shnum, idlib.shortnodeid_b2a(peerid)))
+ (seqnum, root_hash, IV, k, N, segsize, datalength,
+ # this might raise NeedMoreDataError, in which case the rest of
+ # the shares are probably short too. _query_failed() will take
+ # responsiblity for re-issuing the queries with a new length.
+ pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
+
+ if not self._pubkey:
+ fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
+ if fingerprint != self._node._fingerprint:
+ raise CorruptShareError(peerid, shnum,
+ "pubkey doesn't match fingerprint")
+ self._pubkey = self._deserialize_pubkey(pubkey_s)
+ self._node._populate_pubkey(self._pubkey)
+
+ verinfo = (seqnum, root_hash, IV, segsize, datalength)
+ if verinfo not in self._valid_versions:
+ # it's a new pair. Verify the signature.
+ valid = self._pubkey.verify(prefix, signature)
+ if not valid:
+ raise CorruptShareError(peerid, shnum,
+ "signature is invalid")
+ # ok, it's a valid verinfo. Add it to the list of validated
+ # versions.
+ self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
+ % (seqnum, idlib.b2a(root_hash)[:4],
+ idlib.shortnodeid_b2a(peerid), shnum,
+ k, N, segsize, datalength))
+ self._valid_versions[verinfo] = (prefix, DictOfSets())
+
+ # and make a note of the other parameters we've just learned
+ if self._required_shares is None:
+ self._required_shares = k
+ self._node._populate_required_shares(k)
+ if self._total_shares is None:
+ self._total_shares = N
+ self._node._populate_total_shares(N)
+
+ # we've already seen this pair, and checked the signature so we
+ # know it's a valid candidate. Accumulate the share info, if
+ # there's enough data present. If not, raise NeedMoreDataError,
+ # which will trigger a re-fetch.
+ _ignored = unpack_share(data)
+ self.log(" found enough data to add share contents")
+ self._valid_versions[verinfo][1].add(shnum, (peerid, data))
+
+
+ def _query_failed(self, f, peerid):
if not self._running:
return
- if f.check(NeedMoreDataError):
- # ah, just re-send the query then.
- self._read_size = max(self._read_size, f.value.needed_bytes)
- (conn, storage_index, peer_storage_servers) = stuff
- self._do_query(conn, peerid, storage_index, self._read_size,
- peer_storage_servers)
- return
+ self._queries_outstanding.discard(peerid)
+ self._used_peers.add(peerid)
self._last_failure = f
self._bad_peerids.add(peerid)
- short_sid = idlib.b2a(self._storage_index)[:6]
- if f.check(CorruptShareError):
- self.log("WEIRD: bad share for %s: %s %s" % (short_sid, f,
- f.value))
- else:
- self.log("WEIRD: other error for %s: %s %s" % (short_sid, f,
- f.value))
+ self.log("WEIRD: error during query: %s %s" % (f, f.value))
def _check_for_done(self, res):
if not self._running:
- self.log("UNUSUAL: _check_for_done but we're not running")
+ self.log("ODD: _check_for_done but we're not running")
return
share_prefixes = {}
versionmap = DictOfSets()