+ if self._read_length:
+ # our end segment is the last segment containing part of the
+ # segment that we were asked to read.
+ self.log("got read length %d" % self._read_length)
+ end_data = self._offset + self._read_length
+ end = mathutil.div_ceil(end_data,
+ self._segment_size)
+ end -= 1
+ assert end < self._num_segments
+ self._last_segment = end
+ self.log("got end segment: %d" % self._last_segment)
+ else:
+ self._last_segment = self._num_segments - 1
+
+ self._current_segment = self._start_segment
+
+ def _add_active_peers(self):
+ """
+ I populate self._active_readers with enough active readers to
+ retrieve the contents of this mutable file. I am called before
+ downloading starts, and (eventually) after each validation
+ error, connection error, or other problem in the download.
+ """
+ # TODO: It would be cool to investigate other heuristics for
+ # reader selection. For instance, the cost (in time the user
+ # spends waiting for their file) of selecting a really slow peer
+ # that happens to have a primary share is probably more than
+ # selecting a really fast peer that doesn't have a primary
+ # share. Maybe the servermap could be extended to provide this
+ # information; it could keep track of latency information while
+ # it gathers more important data, and then this routine could
+ # use that to select active readers.
+ #
+ # (these and other questions would be easier to answer with a
+ # robust, configurable tahoe-lafs simulator, which modeled node
+ # failures, differences in node speed, and other characteristics
+ # that we expect storage servers to have. You could have
+ # presets for really stable grids (like allmydata.com),
+ # friendnets, make it easy to configure your own settings, and
+ # then simulate the effect of big changes on these use cases
+ # instead of just reasoning about what the effect might be. Out
+ # of scope for MDMF, though.)
+
+ # We need at least self._required_shares readers to download a
+ # segment.
+ if self._verify:
+ needed = self._total_shares
+ else:
+ needed = self._required_shares - len(self._active_readers)
+ # XXX: Why don't format= log messages work here?
+ self.log("adding %d peers to the active peers list" % needed)
+
+ # We favor lower numbered shares, since FEC is faster with
+ # primary shares than with other shares, and lower-numbered
+ # shares are more likely to be primary than higher numbered
+ # shares.
+ active_shnums = set(sorted(self.remaining_sharemap.keys()))
+ # We shouldn't consider adding shares that we already have; this
+ # will cause problems later.
+ active_shnums -= set([reader.shnum for reader in self._active_readers])
+ active_shnums = list(active_shnums)[:needed]
+ if len(active_shnums) < needed and not self._verify:
+ # We don't have enough readers to retrieve the file; fail.
+ return self._failed()
+
+ for shnum in active_shnums:
+ self._active_readers.append(self.readers[shnum])
+ self.log("added reader for share %d" % shnum)
+ assert len(self._active_readers) >= self._required_shares
+ # Conceptually, this is part of the _add_active_peers step. It
+ # validates the prefixes of newly added readers to make sure
+ # that they match what we are expecting for self.verinfo. If
+ # validation is successful, _validate_active_prefixes will call
+ # _download_current_segment for us. If validation is
+ # unsuccessful, then _validate_prefixes will remove the peer and
+ # call _add_active_peers again, where we will attempt to rectify
+ # the problem by choosing another peer.
+ return self._validate_active_prefixes()
+
+
+ def _validate_active_prefixes(self):
+ """
+ I check to make sure that the prefixes on the peers that I am
+ currently reading from match the prefix that we want to see, as
+ said in self.verinfo.
+
+ If I find that all of the active peers have acceptable prefixes,
+ I pass control to _download_current_segment, which will use
+ those peers to do cool things. If I find that some of the active
+ peers have unacceptable prefixes, I will remove them from active
+ peers (and from further consideration) and call
+ _add_active_peers to attempt to rectify the situation. I keep
+ track of which peers I have already validated so that I don't
+ need to do so again.
+ """
+ assert self._active_readers, "No more active readers"
+
+ ds = []
+ new_readers = set(self._active_readers) - self._validated_readers
+ self.log('validating %d newly-added active readers' % len(new_readers))
+
+ for reader in new_readers:
+ # We force a remote read here -- otherwise, we are relying
+ # on cached data that we already verified as valid, and we
+ # won't detect an uncoordinated write that has occurred
+ # since the last servermap update.
+ d = reader.get_prefix(force_remote=True)
+ d.addCallback(self._try_to_validate_prefix, reader)
+ ds.append(d)
+ dl = defer.DeferredList(ds, consumeErrors=True)
+ def _check_results(results):
+ # Each result in results will be of the form (success, msg).
+ # We don't care about msg, but success will tell us whether
+ # or not the checkstring validated. If it didn't, we need to
+ # remove the offending (peer,share) from our active readers,
+ # and ensure that active readers is again populated.
+ bad_readers = []
+ for i, result in enumerate(results):
+ if not result[0]:
+ reader = self._active_readers[i]
+ f = result[1]
+ assert isinstance(f, failure.Failure)
+
+ self.log("The reader %s failed to "
+ "properly validate: %s" % \
+ (reader, str(f.value)))
+ bad_readers.append((reader, f))
+ else:
+ reader = self._active_readers[i]
+ self.log("the reader %s checks out, so we'll use it" % \
+ reader)
+ self._validated_readers.add(reader)
+ # Each time we validate a reader, we check to see if
+ # we need the private key. If we do, we politely ask
+ # for it and then continue computing. If we find
+ # that we haven't gotten it at the end of
+ # segment decoding, then we'll take more drastic
+ # measures.
+ if self._need_privkey and not self._node.is_readonly():
+ d = reader.get_encprivkey()
+ d.addCallback(self._try_to_validate_privkey, reader)
+ if bad_readers:
+ # We do them all at once, or else we screw up list indexing.
+ for (reader, f) in bad_readers:
+ self._mark_bad_share(reader, f)
+ if self._verify:
+ if len(self._active_readers) >= self._required_shares:
+ return self._download_current_segment()
+ else:
+ return self._failed()
+ else:
+ return self._add_active_peers()
+ else:
+ return self._download_current_segment()
+ # The next step will assert that it has enough active
+ # readers to fetch shares; we just need to remove it.
+ dl.addCallback(_check_results)
+ return dl
+
+
+ def _try_to_validate_prefix(self, prefix, reader):
+ """
+ I check that the prefix returned by a candidate server for
+ retrieval matches the prefix that the servermap knows about
+ (and, hence, the prefix that was validated earlier). If it does,
+ I return True, which means that I approve of the use of the
+ candidate server for segment retrieval. If it doesn't, I return
+ False, which means that another server must be chosen.
+ """
+ (seqnum,
+ root_hash,
+ IV,
+ segsize,
+ datalength,
+ k,
+ N,
+ known_prefix,