from twisted.internet.interfaces import IPushProducer, IConsumer
from foolscap.api import eventually, fireEventually, DeadReferenceError, \
RemoteException
+
from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
DownloadStopped, MDMF_VERSION, SDMF_VERSION
+from allmydata.util.assertutil import _assert
from allmydata.util import hashutil, log, mathutil, deferredutil
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
# verify means that we are using the downloader logic to verify all
# of our shares. This tells the downloader a few things.
- #
+ #
# 1. We need to download all of the shares.
# 2. We don't need to decode or decrypt the shares, since our
# caller doesn't care about the plaintext, only the
self._done_deferred = defer.Deferred()
self._offset = offset
self._read_length = size
- self._setup_download()
self._setup_encoding_parameters()
+ self._setup_download()
self.log("starting download")
self._started_fetching = time.time()
# The download process beyond this is a state machine.
self._storage_index, shnum, None)
reader.server = server
self.readers[shnum] = reader
- assert len(self.remaining_sharemap) >= k
+
+ if len(self.remaining_sharemap) < k:
+ self._raise_notenoughshareserror()
self.shares = {} # maps shnum to validated blocks
self._active_readers = [] # list of active readers for this dl.
self._block_hash_trees = {} # shnum => hashtree
+ for i in xrange(self._total_shares):
+ # So we don't have to do this later.
+ self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
+
# We need one share hash tree for the entire file; its leaves
# are the roots of the block hash trees for the shares that
# comprise it, and its root is in the verinfo.
segment with. I return the plaintext associated with that
segment.
"""
- # shnum => block hash tree. Unused, but setup_encoding_parameters will
- # want to set this.
+ # We don't need the block hash trees in this case.
self._block_hash_trees = None
self._setup_encoding_parameters()
(k, n, self._num_segments, self._segment_size,
self._tail_segment_size))
- if self._block_hash_trees is not None:
- for i in xrange(self._total_shares):
- # So we don't have to do this later.
- self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
-
# Our last task is to tell the downloader where to start and
# where to stop. We use three parameters for that:
# - self._start_segment: the segment that we need to start
- # downloading from.
+ # downloading from.
# - self._current_segment: the next segment that we need to
# download.
# - self._last_segment: The last segment that we were asked to
if self._offset:
self.log("got offset: %d" % self._offset)
# our start segment is the first segment containing the
- # offset we were given.
+ # offset we were given.
start = self._offset // self._segment_size
- assert start < self._num_segments
+ _assert(start < self._num_segments,
+ start=start, num_segments=self._num_segments,
+ offset=self._offset, segment_size=self._segment_size)
self._start_segment = start
self.log("got start segment: %d" % self._start_segment)
else:
# but the one before it.
end = (end_data - 1) // self._segment_size
- assert end < self._num_segments
+ _assert(end < self._num_segments,
+ end=end, num_segments=self._num_segments,
+ end_data=end_data, offset=self._offset, read_length=self._read_length,
+ segment_size=self._segment_size)
self._last_segment = end
else:
self._last_segment = self._start_segment
"indicate an uncoordinated write")
# Otherwise, we're okay -- no issues.
-
- def _remove_reader(self, reader):
- """
- At various points, we will wish to remove a server from
- consideration and/or use. These include, but are not necessarily
- limited to:
-
- - A connection error.
- - A mismatched prefix (that is, a prefix that does not match
- our conception of the version information string).
- - A failing block hash, salt hash, or share hash, which can
- indicate disk failure/bit flips, or network trouble.
-
- This method will do that. I will make sure that the
- (shnum,reader) combination represented by my reader argument is
- not used for anything else during this download. I will not
- advise the reader of any corruption, something that my callers
- may wish to do on their own.
- """
- # TODO: When you're done writing this, see if this is ever
- # actually used for something that _mark_bad_share isn't. I have
- # a feeling that they will be used for very similar things, and
- # that having them both here is just going to be an epic amount
- # of code duplication.
- #
- # (well, okay, not epic, but meaningful)
- self.log("removing reader %s" % reader)
- # Remove the reader from _active_readers
- self._active_readers.remove(reader)
- # TODO: self.readers.remove(reader)?
- for shnum in list(self.remaining_sharemap.keys()):
- self.remaining_sharemap.discard(shnum, reader.server)
-
-
def _mark_bad_share(self, server, shnum, reader, f):
"""
I mark the given (server, shnum) as a bad share, which means that it
(shnum, server.get_name()))
prefix = self.verinfo[-2]
self.servermap.mark_bad_share(server, shnum, prefix)
- self._remove_reader(reader)
self._bad_shares.add((server, shnum, f))
self._status.add_problem(server, f)
self._last_failure = f
+
+ # Remove the reader from _active_readers
+ self._active_readers.remove(reader)
+ for shnum in list(self.remaining_sharemap.keys()):
+ self.remaining_sharemap.discard(shnum, reader.server)
+
if f.check(BadShareError):
self.notify_server_corruption(server, shnum, str(f.value))
-
def _download_current_segment(self):
"""
I download, validate, decode, decrypt, and assemble the segment
# Reaching this point means that we know that this segment
# is correct. Now we need to check to see whether the share
- # hash chain is also correct.
+ # hash chain is also correct.
# SDMF wrote share hash chains that didn't contain the
# leaves, which would be produced from the block hash tree.
# So we need to validate the block hash tree first. If
def _raise_notenoughshareserror(self):
"""
- I am called by _activate_enough_servers when there are not enough
- active servers left to complete the download. After making some
- useful logging statements, I throw an exception to that effect
- to the caller of this Retrieve object through
+ I am called when there are not enough active servers left to complete
+ the download. After making some useful logging statements, I throw an
+ exception to that effect to the caller of this Retrieve object through
self._done_deferred.
"""
format = ("ran out of servers: "
- "have %(have)d of %(total)d segments "
- "found %(bad)d bad shares "
+ "have %(have)d of %(total)d segments; "
+ "found %(bad)d bad shares; "
+ "have %(remaining)d remaining shares of the right version; "
"encoding %(k)d-of-%(n)d")
args = {"have": self._current_segment,
"total": self._num_segments,
"need": self._last_segment,
"k": self._required_shares,
"n": self._total_shares,
- "bad": len(self._bad_shares)}
+ "bad": len(self._bad_shares),
+ "remaining": len(self.remaining_sharemap),
+ }
raise NotEnoughSharesError("%s, last failure: %s" %
(format % args, str(self._last_failure)))