from twisted.internet.interfaces import IPushProducer, IConsumer
from foolscap.api import eventually, fireEventually, DeadReferenceError, \
RemoteException
+
from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
DownloadStopped, MDMF_VERSION, SDMF_VERSION
+from allmydata.util.assertutil import _assert, precondition
from allmydata.util import hashutil, log, mathutil, deferredutil
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
return self._problems
def add_fetch_timing(self, server, elapsed):
- serverid = server.get_serverid()
- if serverid not in self.timings["fetch_per_server"]:
- self.timings["fetch_per_server"][serverid] = []
- self.timings["fetch_per_server"][serverid].append(elapsed)
+ if server not in self.timings["fetch_per_server"]:
+ self.timings["fetch_per_server"][server] = []
+ self.timings["fetch_per_server"][server].append(elapsed)
def accumulate_decode_time(self, elapsed):
self.timings["decode"] += elapsed
def accumulate_decrypt_time(self, elapsed):
def __init__(self, filenode, storage_broker, servermap, verinfo,
fetch_privkey=False, verify=False):
self._node = filenode
- assert self._node.get_pubkey()
+ _assert(self._node.get_pubkey())
self._storage_broker = storage_broker
self._storage_index = filenode.get_storage_index()
- assert self._node.get_readkey()
+ _assert(self._node.get_readkey())
self._last_failure = None
prefix = si_b2a(self._storage_index)[:5]
self._log_number = log.msg("Retrieve(%s): starting" % prefix)
self._bad_shares = set()
self.servermap = servermap
- assert self._node.get_pubkey()
self.verinfo = verinfo
+ # TODO: make it possible to use self.verinfo.datalength instead
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = self.verinfo
+ self._data_length = datalength
# during repair, we may be called upon to grab the private key, since
# it wasn't picked up during a verify=False checker run, and we'll
# need it for repair to generate a new version.
# verify means that we are using the downloader logic to verify all
# of our shares. This tells the downloader a few things.
- #
+ #
# 1. We need to download all of the shares.
# 2. We don't need to decode or decrypt the shares, since our
# caller doesn't care about the plaintext, only the
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
- offsets_tuple) = self.verinfo
self._status.set_size(datalength)
self._status.set_encoding(k, N)
self.readers = {}
def download(self, consumer=None, offset=0, size=None):
- assert IConsumer.providedBy(consumer) or self._verify
-
+ precondition(self._verify or IConsumer.providedBy(consumer))
+ if size is None:
+ size = self._data_length - offset
+ if self._verify:
+ _assert(size == self._data_length, (size, self._data_length))
+ self.log("starting download")
+ self._done_deferred = defer.Deferred()
if consumer:
self._consumer = consumer
- # we provide IPushProducer, so streaming=True, per
- # IConsumer.
+ # we provide IPushProducer, so streaming=True, per IConsumer.
self._consumer.registerProducer(self, streaming=True)
+ self._started = time.time()
+ self._started_fetching = time.time()
+ if size == 0:
+ # short-circuit the rest of the process
+ self._done()
+ else:
+ self._start_download(consumer, offset, size)
+ return self._done_deferred
+
+ def _start_download(self, consumer, offset, size):
+ precondition((0 <= offset < self._data_length)
+ and (size > 0)
+ and (offset+size <= self._data_length),
+ (offset, size, self._data_length))
- self._done_deferred = defer.Deferred()
self._offset = offset
self._read_length = size
- self._setup_download()
self._setup_encoding_parameters()
- self.log("starting download")
- self._started_fetching = time.time()
+ self._setup_download()
+
# The download process beyond this is a state machine.
# _add_active_servers will select the servers that we want to use
# for the download, and then attempt to start downloading. After
# will errback. Otherwise, it will eventually callback with the
# contents of the mutable file.
self.loop()
- return self._done_deferred
def loop(self):
d = fireEventually(None) # avoid #237 recursion limit problem
d.addErrback(self._error)
def _setup_download(self):
- self._started = time.time()
self._status.set_status("Retrieving Shares")
# how many shares do we need?
self.remaining_sharemap = DictOfSets()
for (shnum, server, timestamp) in shares:
self.remaining_sharemap.add(shnum, server)
- # If the servermap update fetched anything, it fetched at least 1
- # KiB, so we ask for that much.
- # TODO: Change the cache methods to allow us to fetch all of the
- # data that they have, then change this method to do that.
- any_cache = self._node._read_from_cache(self.verinfo, shnum,
- 0, 1000)
- reader = MDMFSlotReadProxy(server.get_rref(),
- self._storage_index,
- shnum,
- any_cache)
+ # Reuse the SlotReader from the servermap.
+ key = (self.verinfo, server.get_serverid(),
+ self._storage_index, shnum)
+ if key in self.servermap.proxies:
+ reader = self.servermap.proxies[key]
+ else:
+ reader = MDMFSlotReadProxy(server.get_rref(),
+ self._storage_index, shnum, None)
reader.server = server
self.readers[shnum] = reader
- assert len(self.remaining_sharemap) >= k
+
+ if len(self.remaining_sharemap) < k:
+ self._raise_notenoughshareserror()
self.shares = {} # maps shnum to validated blocks
self._active_readers = [] # list of active readers for this dl.
self._block_hash_trees = {} # shnum => hashtree
+ for i in xrange(self._total_shares):
+ # So we don't have to do this later.
+ self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
+
# We need one share hash tree for the entire file; its leaves
# are the roots of the block hash trees for the shares that
# comprise it, and its root is in the verinfo.
segment with. I return the plaintext associated with that
segment.
"""
- # shnum => block hash tree. Unused, but setup_encoding_parameters will
- # want to set this.
+ # We don't need the block hash trees in this case.
self._block_hash_trees = None
+ self._offset = 0
+ self._read_length = self._data_length
self._setup_encoding_parameters()
# _decode_blocks() expects the output of a gatherResults that
self._required_shares = k
self._total_shares = n
self._segment_size = segsize
- self._data_length = datalength
+ #self._data_length = datalength # set during __init__()
if not IV:
self._version = MDMF_VERSION
(k, n, self._num_segments, self._segment_size,
self._tail_segment_size))
- if self._block_hash_trees is not None:
- for i in xrange(self._total_shares):
- # So we don't have to do this later.
- self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
-
# Our last task is to tell the downloader where to start and
# where to stop. We use three parameters for that:
# - self._start_segment: the segment that we need to start
- # downloading from.
+ # downloading from.
# - self._current_segment: the next segment that we need to
# download.
# - self._last_segment: The last segment that we were asked to
if self._offset:
self.log("got offset: %d" % self._offset)
# our start segment is the first segment containing the
- # offset we were given.
+ # offset we were given.
start = self._offset // self._segment_size
- assert start < self._num_segments
+ _assert(start <= self._num_segments,
+ start=start, num_segments=self._num_segments,
+ offset=self._offset, segment_size=self._segment_size)
self._start_segment = start
self.log("got start segment: %d" % self._start_segment)
else:
self._start_segment = 0
-
- # If self._read_length is None, then we want to read the whole
- # file. Otherwise, we want to read only part of the file, and
- # need to figure out where to stop reading.
- if self._read_length is not None:
- # our end segment is the last segment containing part of the
- # segment that we were asked to read.
- self.log("got read length %d" % self._read_length)
- if self._read_length != 0:
- end_data = self._offset + self._read_length
-
- # We don't actually need to read the byte at end_data,
- # but the one before it.
- end = (end_data - 1) // self._segment_size
-
- assert end < self._num_segments
- self._last_segment = end
- else:
- self._last_segment = self._start_segment
- self.log("got end segment: %d" % self._last_segment)
- else:
- self._last_segment = self._num_segments - 1
+ # We might want to read only part of the file, and need to figure out
+ # where to stop reading. Our end segment is the last segment
+ # containing part of the segment that we were asked to read.
+ _assert(self._read_length > 0, self._read_length)
+ end_data = self._offset + self._read_length
+
+ # We don't actually need to read the byte at end_data, but the one
+ # before it.
+ end = (end_data - 1) // self._segment_size
+ _assert(0 <= end < self._num_segments,
+ end=end, num_segments=self._num_segments,
+ end_data=end_data, offset=self._offset,
+ read_length=self._read_length, segment_size=self._segment_size)
+ self._last_segment = end
+ self.log("got end segment: %d" % self._last_segment)
self._current_segment = self._start_segment
"indicate an uncoordinated write")
# Otherwise, we're okay -- no issues.
-
- def _remove_reader(self, reader):
- """
- At various points, we will wish to remove a server from
- consideration and/or use. These include, but are not necessarily
- limited to:
-
- - A connection error.
- - A mismatched prefix (that is, a prefix that does not match
- our conception of the version information string).
- - A failing block hash, salt hash, or share hash, which can
- indicate disk failure/bit flips, or network trouble.
-
- This method will do that. I will make sure that the
- (shnum,reader) combination represented by my reader argument is
- not used for anything else during this download. I will not
- advise the reader of any corruption, something that my callers
- may wish to do on their own.
- """
- # TODO: When you're done writing this, see if this is ever
- # actually used for something that _mark_bad_share isn't. I have
- # a feeling that they will be used for very similar things, and
- # that having them both here is just going to be an epic amount
- # of code duplication.
- #
- # (well, okay, not epic, but meaningful)
- self.log("removing reader %s" % reader)
- # Remove the reader from _active_readers
- self._active_readers.remove(reader)
- # TODO: self.readers.remove(reader)?
- for shnum in list(self.remaining_sharemap.keys()):
- self.remaining_sharemap.discard(shnum, reader.server)
-
-
def _mark_bad_share(self, server, shnum, reader, f):
"""
I mark the given (server, shnum) as a bad share, which means that it
(shnum, server.get_name()))
prefix = self.verinfo[-2]
self.servermap.mark_bad_share(server, shnum, prefix)
- self._remove_reader(reader)
self._bad_shares.add((server, shnum, f))
self._status.add_problem(server, f)
self._last_failure = f
+
+ # Remove the reader from _active_readers
+ self._active_readers.remove(reader)
+ for shnum in list(self.remaining_sharemap.keys()):
+ self.remaining_sharemap.discard(shnum, reader.server)
+
if f.check(BadShareError):
self.notify_server_corruption(server, shnum, str(f.value))
-
def _download_current_segment(self):
"""
I download, validate, decode, decrypt, and assemble the segment
that this Retrieve is currently responsible for downloading.
"""
+
if self._current_segment > self._last_segment:
# No more segments to download, we're done.
self.log("got plaintext, done")
target that is handling the file download.
"""
self.log("got plaintext for segment %d" % self._current_segment)
+
+ if self._read_length == 0:
+ self.log("on first+last segment, size=0, using 0 bytes")
+ segment = b""
+
+ if self._current_segment == self._last_segment:
+ # trim off the tail
+ wanted = (self._offset + self._read_length) % self._segment_size
+ if wanted != 0:
+ self.log("on the last segment: using first %d bytes" % wanted)
+ segment = segment[:wanted]
+ else:
+ self.log("on the last segment: using all %d bytes" %
+ len(segment))
+
if self._current_segment == self._start_segment:
- # We're on the first segment. It's possible that we want
- # only some part of the end of this segment, and that we
- # just downloaded the whole thing to get that part. If so,
- # we need to account for that and give the reader just the
- # data that they want.
- n = self._offset % self._segment_size
- self.log("stripping %d bytes off of the first segment" % n)
- self.log("original segment length: %d" % len(segment))
- segment = segment[n:]
- self.log("new segment length: %d" % len(segment))
-
- if self._current_segment == self._last_segment and self._read_length is not None:
- # We're on the last segment. It's possible that we only want
- # part of the beginning of this segment, and that we
- # downloaded the whole thing anyway. Make sure to give the
- # caller only the portion of the segment that they want to
- # receive.
- extra = self._read_length
- if self._start_segment != self._last_segment:
- extra -= self._segment_size - \
- (self._offset % self._segment_size)
- extra %= self._segment_size
- self.log("original segment length: %d" % len(segment))
- segment = segment[:extra]
- self.log("new segment length: %d" % len(segment))
- self.log("only taking %d bytes of the last segment" % extra)
+ # Trim off the head, if offset != 0. This should also work if
+ # start==last, because we trim the tail first.
+ skip = self._offset % self._segment_size
+ self.log("on the first segment: skipping first %d bytes" % skip)
+ segment = segment[skip:]
if not self._verify:
self._consumer.write(segment)
# wrong data, and CorruptShareError which happens later, when we
# perform integrity checks on the data.
- assert isinstance(readers, list)
+ precondition(isinstance(readers, list), readers)
bad_shnums = [reader.shnum for reader in readers]
self.log("validation or decoding failed on share(s) %s, server(s) %s "
block_and_salt, blockhashes, sharehashes = results
block, salt = block_and_salt
+ _assert(type(block) is str, (block, salt))
blockhashes = dict(enumerate(blockhashes))
self.log("the reader gave me the following blockhashes: %s" % \
# Reaching this point means that we know that this segment
# is correct. Now we need to check to see whether the share
- # hash chain is also correct.
+ # hash chain is also correct.
# SDMF wrote share hash chains that didn't contain the
# leaves, which would be produced from the block hash tree.
# So we need to validate the block hash tree first. If
#needed.discard(0)
self.log("getting blockhashes for segment %d, share %d: %s" % \
(segnum, reader.shnum, str(needed)))
- d1 = reader.get_blockhashes(needed, force_remote=True)
+ # TODO is force_remote necessary here?
+ d1 = reader.get_blockhashes(needed, force_remote=False)
if self.share_hash_tree.needed_hashes(reader.shnum):
need = self.share_hash_tree.needed_hashes(reader.shnum)
self.log("also need sharehashes for share %d: %s" % (reader.shnum,
str(need)))
- d2 = reader.get_sharehashes(need, force_remote=True)
+ d2 = reader.get_sharehashes(need, force_remote=False)
else:
d2 = defer.succeed({}) # the logic in the next method
# expects a dict
self._set_current_status("decoding")
started = time.time()
- assert len(shareids) >= self._required_shares, len(shareids)
+ _assert(len(shareids) >= self._required_shares, len(shareids))
# zfec really doesn't want extra shares
shareids = shareids[:self._required_shares]
shares = shares[:self._required_shares]
def _raise_notenoughshareserror(self):
"""
- I am called by _activate_enough_servers when there are not enough
- active servers left to complete the download. After making some
- useful logging statements, I throw an exception to that effect
- to the caller of this Retrieve object through
+ I am called when there are not enough active servers left to complete
+ the download. After making some useful logging statements, I throw an
+ exception to that effect to the caller of this Retrieve object through
self._done_deferred.
"""
format = ("ran out of servers: "
- "have %(have)d of %(total)d segments "
- "found %(bad)d bad shares "
+ "have %(have)d of %(total)d segments; "
+ "found %(bad)d bad shares; "
+ "have %(remaining)d remaining shares of the right version; "
"encoding %(k)d-of-%(n)d")
args = {"have": self._current_segment,
"total": self._num_segments,
"need": self._last_segment,
"k": self._required_shares,
"n": self._total_shares,
- "bad": len(self._bad_shares)}
+ "bad": len(self._bad_shares),
+ "remaining": len(self.remaining_sharemap),
+ }
raise NotEnoughSharesError("%s, last failure: %s" %
(format % args, str(self._last_failure)))