def __init__(self):
self.timings = {}
self.timings["fetch_per_server"] = {}
+ self.timings["decode"] = 0.0
+ self.timings["decrypt"] = 0.0
self.timings["cumulative_verify"] = 0.0
self.problems = {}
self.active = True
if peerid not in self.timings["fetch_per_server"]:
self.timings["fetch_per_server"][peerid] = []
self.timings["fetch_per_server"][peerid].append(elapsed)
+ def accumulate_decode_time(self, elapsed):
+ self.timings["decode"] += elapsed
+ def accumulate_decrypt_time(self, elapsed):
+ self.timings["decrypt"] += elapsed
def set_storage_index(self, si):
self.storage_index = si
def set_helper(self, helper):
kwargs["facility"] = "tahoe.mutable.retrieve"
return log.msg(*args, **kwargs)
+ def _set_current_status(self, state):
+ seg = "%d/%d" % (self._current_segment, self._last_segment)
+ self._status.set_status("segment %s (%s)" % (seg, state))
###################
# IPushProducer
# fired when the download is unpaused.
self._old_status = self._status.get_status()
- self._status.set_status("Paused")
+ self._set_current_status("paused")
self._pause_deferred = defer.Deferred()
self._consumer.registerProducer(self, streaming=True)
self._done_deferred = defer.Deferred()
+ self._offset = offset
+ self._read_length = size
+ self._setup_download()
+ self._setup_encoding_parameters()
+ self.log("starting download")
+ self._started_fetching = time.time()
+ d = self._add_active_peers()
+ # ...
+ # The download process beyond this is a state machine.
+ # _add_active_peers will select the peers that we want to use
+ # for the download, and then attempt to start downloading. After
+ # each segment, it will check for doneness, reacting to broken
+ # peers and corrupt shares as necessary. If it runs out of good
+ # peers before downloading all of the segments, _done_deferred
+ # will errback. Otherwise, it will eventually callback with the
+ # contents of the mutable file.
+ return self._done_deferred
+
+ def _setup_download(self):
self._started = time.time()
self._status.set_status("Retrieving Shares")
- self._offset = offset
- self._read_length = size
+ # how many shares do we need?
+ (seqnum,
+ root_hash,
+ IV,
+ segsize,
+ datalength,
+ k,
+ N,
+ prefix,
+ offsets_tuple) = self.verinfo
# first, which servers can we use?
versionmap = self.servermap.make_versionmap()
any_cache)
reader.peerid = peerid
self.readers[shnum] = reader
-
+ assert len(self.remaining_sharemap) >= k
self.shares = {} # maps shnum to validated blocks
self._active_readers = [] # list of active readers for this dl.
# validated the prefix of
self._block_hash_trees = {} # shnum => hashtree
- # how many shares do we need?
- (seqnum,
- root_hash,
- IV,
- segsize,
- datalength,
- k,
- N,
- prefix,
- offsets_tuple) = self.verinfo
-
-
# We need one share hash tree for the entire file; its leaves
# are the roots of the block hash trees for the shares that
# comprise it, and its root is in the verinfo.
self.share_hash_tree = hashtree.IncompleteHashTree(N)
self.share_hash_tree.set_hashes({0: root_hash})
- # This will set up both the segment decoder and the tail segment
- # decoder, as well as a variety of other instance variables that
- # the download process will use.
- self._setup_encoding_parameters()
- assert len(self.remaining_sharemap) >= k
-
- self.log("starting download")
- self._started_fetching = time.time()
-
- self._add_active_peers()
- # The download process beyond this is a state machine.
- # _add_active_peers will select the peers that we want to use
- # for the download, and then attempt to start downloading. After
- # each segment, it will check for doneness, reacting to broken
- # peers and corrupt shares as necessary. If it runs out of good
- # peers before downloading all of the segments, _done_deferred
- # will errback. Otherwise, it will eventually callback with the
- # contents of the mutable file.
- return self._done_deferred
-
-
def decode(self, blocks_and_salts, segnum):
"""
I am a helper method that the mutable file update process uses
"""
# shnum => block hash tree. Unused, but setup_encoding_parameters will
# want to set this.
- # XXX: Make it so that it won't set this if we're just decoding.
self._block_hash_trees = None
self._setup_encoding_parameters()
+
# This is the form expected by decode.
blocks_and_salts = blocks_and_salts.items()
blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
def _setup_encoding_parameters(self):
"""
I set up the encoding parameters, including k, n, the number
- of segments associated with this file, and the segment decoder.
+ of segments associated with this file, and the segment decoders.
"""
(seqnum,
root_hash,
self.log("got offset: %d" % self._offset)
# our start segment is the first segment containing the
# offset we were given.
- start = mathutil.div_ceil(self._offset,
- self._segment_size)
- # this gets us the first segment after self._offset. Then
- # our start segment is the one before it.
- start -= 1
+ start = self._offset // self._segment_size
assert start < self._num_segments
self._start_segment = start
self._start_segment = 0
- if self._read_length:
+ # If self._read_length is None, then we want to read the whole
+ # file. Otherwise, we want to read only part of the file, and
+ # need to figure out where to stop reading.
+ if self._read_length is not None:
# our end segment is the last segment containing part of the
# segment that we were asked to read.
self.log("got read length %d" % self._read_length)
- end_data = self._offset + self._read_length
- end = mathutil.div_ceil(end_data,
- self._segment_size)
- end -= 1
- assert end < self._num_segments
- self._last_segment = end
+ if self._read_length != 0:
+ end_data = self._offset + self._read_length
+
+ # We don't actually need to read the byte at end_data,
+ # but the one before it.
+ end = (end_data - 1) // self._segment_size
+
+ assert end < self._num_segments
+ self._last_segment = end
+ else:
+ self._last_segment = self._start_segment
self.log("got end segment: %d" % self._last_segment)
else:
self._last_segment = self._num_segments - 1
self._active_readers.append(self.readers[shnum])
self.log("added reader for share %d" % shnum)
assert len(self._active_readers) >= self._required_shares
- # Conceptually, this is part of the _add_active_peers step. It
- # validates the prefixes of newly added readers to make sure
- # that they match what we are expecting for self.verinfo. If
- # validation is successful, _validate_active_prefixes will call
- # _download_current_segment for us. If validation is
- # unsuccessful, then _validate_prefixes will remove the peer and
- # call _add_active_peers again, where we will attempt to rectify
- # the problem by choosing another peer.
- return self._validate_active_prefixes()
-
-
- def _validate_active_prefixes(self):
- """
- I check to make sure that the prefixes on the peers that I am
- currently reading from match the prefix that we want to see, as
- said in self.verinfo.
-
- If I find that all of the active peers have acceptable prefixes,
- I pass control to _download_current_segment, which will use
- those peers to do cool things. If I find that some of the active
- peers have unacceptable prefixes, I will remove them from active
- peers (and from further consideration) and call
- _add_active_peers to attempt to rectify the situation. I keep
- track of which peers I have already validated so that I don't
- need to do so again.
- """
- assert self._active_readers, "No more active readers"
-
- ds = []
new_readers = set(self._active_readers) - self._validated_readers
- self.log('validating %d newly-added active readers' % len(new_readers))
for reader in new_readers:
- # We force a remote read here -- otherwise, we are relying
- # on cached data that we already verified as valid, and we
- # won't detect an uncoordinated write that has occurred
- # since the last servermap update.
- d = reader.get_prefix(force_remote=True)
- d.addCallback(self._try_to_validate_prefix, reader)
- ds.append(d)
- dl = defer.DeferredList(ds, consumeErrors=True)
- def _check_results(results):
- # Each result in results will be of the form (success, msg).
- # We don't care about msg, but success will tell us whether
- # or not the checkstring validated. If it didn't, we need to
- # remove the offending (peer,share) from our active readers,
- # and ensure that active readers is again populated.
- bad_readers = []
- for i, result in enumerate(results):
- if not result[0]:
- reader = self._active_readers[i]
- f = result[1]
- assert isinstance(f, failure.Failure)
-
- self.log("The reader %s failed to "
- "properly validate: %s" % \
- (reader, str(f.value)))
- bad_readers.append((reader, f))
- else:
- reader = self._active_readers[i]
- self.log("the reader %s checks out, so we'll use it" % \
- reader)
- self._validated_readers.add(reader)
- # Each time we validate a reader, we check to see if
- # we need the private key. If we do, we politely ask
- # for it and then continue computing. If we find
- # that we haven't gotten it at the end of
- # segment decoding, then we'll take more drastic
- # measures.
- if self._need_privkey and not self._node.is_readonly():
- d = reader.get_encprivkey()
- d.addCallback(self._try_to_validate_privkey, reader)
- if bad_readers:
- # We do them all at once, or else we screw up list indexing.
- for (reader, f) in bad_readers:
- self._mark_bad_share(reader, f)
- if self._verify:
- if len(self._active_readers) >= self._required_shares:
- return self._download_current_segment()
- else:
- return self._failed()
- else:
- return self._add_active_peers()
- else:
- return self._download_current_segment()
- # The next step will assert that it has enough active
- # readers to fetch shares; we just need to remove it.
- dl.addCallback(_check_results)
- return dl
+ self._validated_readers.add(reader)
+ # Each time we validate a reader, we check to see if we need the
+ # private key. If we do, we politely ask for it and then continue
+ # computing. If we find that we haven't gotten it at the end of
+ # segment decoding, then we'll take more drastic measures.
+ if self._need_privkey and not self._node.is_readonly():
+ d = reader.get_encprivkey()
+ d.addCallback(self._try_to_validate_privkey, reader)
+ # XXX: don't just drop the Deferred. We need error-reporting
+ # but not flow-control here.
+ return self._download_current_segment()
def _try_to_validate_prefix(self, prefix, reader):
ds = []
for reader in self._active_readers:
started = time.time()
- d = reader.get_block_and_salt(segnum, queue=True)
+ d = reader.get_block_and_salt(segnum)
d2 = self._get_needed_hashes(reader, segnum)
dl = defer.DeferredList([d, d2], consumeErrors=True)
dl.addCallback(self._validate_block, segnum, reader, started)
dl.addErrback(self._validation_or_decoding_failed, [reader])
ds.append(dl)
- reader.flush()
dl = defer.DeferredList(ds)
if self._verify:
dl.addCallback(lambda ignored: "")
# validate this block, then generate the block hash root.
self.log("validating share %d for segment %d" % (reader.shnum,
segnum))
- self._status.add_fetch_timing(reader.peerid, started)
- self._status.set_status("Valdiating blocks for segment %d" % segnum)
+ elapsed = time.time() - started
+ self._status.add_fetch_timing(reader.peerid, elapsed)
+ self._set_current_status("validating blocks")
# Did we fail to fetch either of the things that we were
# supposed to? Fail if so.
if not results[0][0] and results[1][0]:
#needed.discard(0)
self.log("getting blockhashes for segment %d, share %d: %s" % \
(segnum, reader.shnum, str(needed)))
- d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
+ d1 = reader.get_blockhashes(needed, force_remote=True)
if self.share_hash_tree.needed_hashes(reader.shnum):
need = self.share_hash_tree.needed_hashes(reader.shnum)
self.log("also need sharehashes for share %d: %s" % (reader.shnum,
str(need)))
- d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
+ d2 = reader.get_sharehashes(need, force_remote=True)
else:
d2 = defer.succeed({}) # the logic in the next method
# expects a dict
shareids.append(shareid)
shares.append(share)
- self._status.set_status("Decoding")
+ self._set_current_status("decoding")
started = time.time()
assert len(shareids) >= self._required_shares, len(shareids)
# zfec really doesn't want extra shares
size_to_use = self._segment_size
segment = segment[:size_to_use]
self.log(" segment len=%d" % len(segment))
- self._status.timings.setdefault("decode", 0)
- self._status.timings['decode'] = time.time() - started
+ self._status.accumulate_decode_time(time.time() - started)
return segment, salt
d.addCallback(_process)
return d
the plaintext of the segment that is in my argument.
"""
segment, salt = segment_and_salt
- self._status.set_status("decrypting")
+ self._set_current_status("decrypting")
self.log("decrypting segment %d" % self._current_segment)
started = time.time()
key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
decryptor = AES(key)
plaintext = decryptor.process(segment)
- self._status.timings.setdefault("decrypt", 0)
- self._status.timings['decrypt'] = time.time() - started
+ self._status.accumulate_decrypt_time(time.time() - started)
return plaintext
now = time.time()
self._status.timings['total'] = now - self._started
self._status.timings['fetch'] = now - self._started_fetching
+ self._status.set_status("Finished")
+ self._status.set_progress(1.0)
+
+ # remember the encoding parameters, use them again next time
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = self.verinfo
+ self._node._populate_required_shares(k)
+ self._node._populate_total_shares(N)
if self._verify:
ret = list(self._bad_shares)
now = time.time()
self._status.timings['total'] = now - self._started
self._status.timings['fetch'] = now - self._started_fetching
+ self._status.set_status("Failed")
if self._verify:
ret = list(self._bad_shares)