"""
assert self._active_readers, "No more active readers"
- ds = []
new_readers = set(self._active_readers) - self._validated_readers
self.log('validating %d newly-added active readers' % len(new_readers))
for reader in new_readers:
- # We force a remote read here -- otherwise, we are relying
- # on cached data that we already verified as valid, and we
- # won't detect an uncoordinated write that has occurred
- # since the last servermap update.
- d = reader.get_prefix(force_remote=True)
- d.addCallback(self._try_to_validate_prefix, reader)
- ds.append(d)
- dl = defer.DeferredList(ds, consumeErrors=True)
- def _check_results(results):
- # Each result in results will be of the form (success, msg).
- # We don't care about msg, but success will tell us whether
- # or not the checkstring validated. If it didn't, we need to
- # remove the offending (peer,share) from our active readers,
- # and ensure that active readers is again populated.
- bad_readers = []
- for i, result in enumerate(results):
- if not result[0]:
- reader = self._active_readers[i]
- f = result[1]
- assert isinstance(f, failure.Failure)
-
- self.log("The reader %s failed to "
- "properly validate: %s" % \
- (reader, str(f.value)))
- bad_readers.append((reader, f))
- else:
- reader = self._active_readers[i]
- self.log("the reader %s checks out, so we'll use it" % \
- reader)
- self._validated_readers.add(reader)
- # Each time we validate a reader, we check to see if
- # we need the private key. If we do, we politely ask
- # for it and then continue computing. If we find
- # that we haven't gotten it at the end of
- # segment decoding, then we'll take more drastic
- # measures.
- if self._need_privkey and not self._node.is_readonly():
- d = reader.get_encprivkey()
- d.addCallback(self._try_to_validate_privkey, reader)
- if bad_readers:
- # We do them all at once, or else we screw up list indexing.
- for (reader, f) in bad_readers:
- self._mark_bad_share(reader, f)
- if self._verify:
- if len(self._active_readers) >= self._required_shares:
- return self._download_current_segment()
- else:
- return self._failed()
- else:
- return self._add_active_peers()
- else:
- return self._download_current_segment()
- # The next step will assert that it has enough active
- # readers to fetch shares; we just need to remove it.
- dl.addCallback(_check_results)
- return dl
+ self._validated_readers.add(reader)
+ # Each time we validate a reader, we check to see if we need the
+ # private key. If we do, we politely ask for it and then continue
+ # computing. If we find that we haven't gotten it at the end of
+ # segment decoding, then we'll take more drastic measures.
+ if self._need_privkey and not self._node.is_readonly():
+ d = reader.get_encprivkey()
+ d.addCallback(self._try_to_validate_privkey, reader)
+ # XXX: don't just drop the Deferred. We need error-reporting
+ # but not flow-control here.
+ return self._download_current_segment()
def _try_to_validate_prefix(self, prefix, reader):