From c2765bd8c6a1dff6e7b3758760dbb9693668b377 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Fri, 16 Nov 2007 17:12:33 -0700 Subject: [PATCH] mutable: fix control flow to allow good+bad shares from a peer. Fixes #211. --- src/allmydata/mutable.py | 147 +++++++++++++++++++++------------------ 1 file changed, 79 insertions(+), 68 deletions(-) diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index 6253c51b..3fbe6fb9 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -342,9 +342,9 @@ class Retrieve: d.addCallback(_got_storageserver) d.addCallback(lambda ss: ss.callRemote("slot_readv", storage_index, [], [(0, readsize)])) - d.addCallback(self._got_results, peerid, readsize) - d.addErrback(self._query_failed, peerid, (conn, storage_index, - peer_storage_servers)) + d.addCallback(self._got_results, peerid, readsize, + (conn, storage_index, peer_storage_servers)) + d.addErrback(self._query_failed, peerid) # errors that aren't handled by _query_failed (and errors caused by # _query_failed) get logged, but we still want to check for doneness. d.addErrback(log.err) @@ -355,7 +355,7 @@ class Retrieve: verifier = rsa.create_verifying_key_from_string(pubkey_s) return verifier - def _got_results(self, datavs, peerid, readsize): + def _got_results(self, datavs, peerid, readsize, stuff): self._queries_outstanding.discard(peerid) self._used_peers.add(peerid) if not self._running: @@ -363,77 +363,88 @@ class Retrieve: for shnum,datav in datavs.items(): data = datav[0] - self.log("_got_results: got shnum #%d from peerid %s" - % (shnum, idlib.shortnodeid_b2a(peerid))) - (seqnum, root_hash, IV, k, N, segsize, datalength, - pubkey_s, signature, prefix) = unpack_prefix_and_signature(data) - - if not self._pubkey: - fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) - if fingerprint != self._node._fingerprint: - # bad share - raise CorruptShareError(peerid, shnum, - "pubkey doesn't match fingerprint") - self._pubkey = self._deserialize_pubkey(pubkey_s) - self._node._populate_pubkey(self._pubkey) - - verinfo = (seqnum, root_hash, IV, segsize, datalength) - if verinfo not in self._valid_versions: - # it's a new pair. Verify the signature. - valid = self._pubkey.verify(prefix, signature) - if not valid: - raise CorruptShareError(peerid, shnum, - "signature is invalid") - # ok, it's a valid verinfo. Add it to the list of validated - # versions. - self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d" - % (seqnum, idlib.b2a(root_hash)[:4], - idlib.shortnodeid_b2a(peerid), shnum, - k, N, segsize, datalength)) - self._valid_versions[verinfo] = (prefix, DictOfSets()) - - # and make a note of the other parameters we've just learned - if self._required_shares is None: - self._required_shares = k - self._node._populate_required_shares(k) - if self._total_shares is None: - self._total_shares = N - self._node._populate_total_shares(N) - - # we've already seen this pair, and checked the signature so we - # know it's a valid candidate. Accumulate the share info, if - # there's enough data present. If not, raise NeedMoreDataError, - # which will trigger a re-fetch. - _ignored = unpack_share(data) - self.log(" found enough data to add share contents") - self._valid_versions[verinfo][1].add(shnum, (peerid, data)) - - - def _query_failed(self, f, peerid, stuff): - self._queries_outstanding.discard(peerid) - self._used_peers.add(peerid) + try: + self._got_results_one_share(shnum, data, peerid) + except NeedMoreDataError, e: + # ah, just re-send the query then. + self._read_size = max(self._read_size, e.needed_bytes) + # TODO: for MDMF, sanity-check self._read_size: don't let one + # server cause us to try to read gigabytes of data from all + # other servers. + (conn, storage_index, peer_storage_servers) = stuff + self._do_query(conn, peerid, storage_index, self._read_size, + peer_storage_servers) + return + except CorruptShareError, e: + # log it and give the other shares a chance to be processed + f = failure.Failure() + self.log("WEIRD: bad share: %s %s" % (f, f.value)) + self._bad_peerids.add(peerid) + self._last_failure = f + pass + # all done! + + def _got_results_one_share(self, shnum, data, peerid): + self.log("_got_results: got shnum #%d from peerid %s" + % (shnum, idlib.shortnodeid_b2a(peerid))) + (seqnum, root_hash, IV, k, N, segsize, datalength, + # this might raise NeedMoreDataError, in which case the rest of + # the shares are probably short too. _query_failed() will take + # responsiblity for re-issuing the queries with a new length. + pubkey_s, signature, prefix) = unpack_prefix_and_signature(data) + + if not self._pubkey: + fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) + if fingerprint != self._node._fingerprint: + raise CorruptShareError(peerid, shnum, + "pubkey doesn't match fingerprint") + self._pubkey = self._deserialize_pubkey(pubkey_s) + self._node._populate_pubkey(self._pubkey) + + verinfo = (seqnum, root_hash, IV, segsize, datalength) + if verinfo not in self._valid_versions: + # it's a new pair. Verify the signature. + valid = self._pubkey.verify(prefix, signature) + if not valid: + raise CorruptShareError(peerid, shnum, + "signature is invalid") + # ok, it's a valid verinfo. Add it to the list of validated + # versions. + self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d" + % (seqnum, idlib.b2a(root_hash)[:4], + idlib.shortnodeid_b2a(peerid), shnum, + k, N, segsize, datalength)) + self._valid_versions[verinfo] = (prefix, DictOfSets()) + + # and make a note of the other parameters we've just learned + if self._required_shares is None: + self._required_shares = k + self._node._populate_required_shares(k) + if self._total_shares is None: + self._total_shares = N + self._node._populate_total_shares(N) + + # we've already seen this pair, and checked the signature so we + # know it's a valid candidate. Accumulate the share info, if + # there's enough data present. If not, raise NeedMoreDataError, + # which will trigger a re-fetch. + _ignored = unpack_share(data) + self.log(" found enough data to add share contents") + self._valid_versions[verinfo][1].add(shnum, (peerid, data)) + + + def _query_failed(self, f, peerid): if not self._running: return - if f.check(NeedMoreDataError): - # ah, just re-send the query then. - self._read_size = max(self._read_size, f.value.needed_bytes) - (conn, storage_index, peer_storage_servers) = stuff - self._do_query(conn, peerid, storage_index, self._read_size, - peer_storage_servers) - return + self._queries_outstanding.discard(peerid) + self._used_peers.add(peerid) self._last_failure = f self._bad_peerids.add(peerid) - short_sid = idlib.b2a(self._storage_index)[:6] - if f.check(CorruptShareError): - self.log("WEIRD: bad share for %s: %s %s" % (short_sid, f, - f.value)) - else: - self.log("WEIRD: other error for %s: %s %s" % (short_sid, f, - f.value)) + self.log("WEIRD: error during query: %s %s" % (f, f.value)) def _check_for_done(self, res): if not self._running: - self.log("UNUSUAL: _check_for_done but we're not running") + self.log("ODD: _check_for_done but we're not running") return share_prefixes = {} versionmap = DictOfSets() -- 2.45.2