From 9756146d616735522f38e3cdc8b62e234820d467 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 28 Aug 2011 16:22:21 -0700 Subject: [PATCH] Fix mutable publish/retrieve timing status displays. Fixes #1505. publish: * encrypt and encode times are cumulative, not just current-segment retrieve: * same for decrypt and decode times * update "current status" to include segment number * set status to Finished/Failed when download is complete * set progress to 1.0 when complete More improvements to consider: * progress is currently 0% or 100%: should calculate how many segments are involved (remembering retrieve can be less than the whole file) and set it to a fraction * "fetch" time is fuzzy: what we want is to know how much of the delay is not our own fault, but since we do decode/decrypt work while waiting for more shares, it's not straightforward --- src/allmydata/mutable/publish.py | 10 ++++++++-- src/allmydata/mutable/retrieve.py | 29 ++++++++++++++++++++--------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index 066ee904..e74be2ec 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -33,6 +33,8 @@ class PublishStatus: def __init__(self): self.timings = {} self.timings["send_per_server"] = {} + self.timings["encrypt"] = 0.0 + self.timings["encode"] = 0.0 self.servermap = None self.problems = {} self.active = True @@ -49,6 +51,10 @@ class PublishStatus: if peerid not in self.timings["send_per_server"]: self.timings["send_per_server"][peerid] = [] self.timings["send_per_server"][peerid].append(elapsed) + def accumulate_encode_time(self, elapsed): + self.timings["encode"] += elapsed + def accumulate_encrypt_time(self, elapsed): + self.timings["encrypt"] += elapsed def get_started(self): return self.started @@ -711,7 +717,7 @@ class Publish: assert len(crypttext) == len(data) now = time.time() - self._status.timings["encrypt"] = now - started + self._status.accumulate_encrypt_time(now - started) started = now # now apply FEC @@ -732,7 +738,7 @@ class Publish: d = fec.encode(crypttext_pieces) def _done_encoding(res): elapsed = time.time() - started - self._status.timings["encode"] = elapsed + self._status.accumulate_encode_time(elapsed) return (res, salt) d.addCallback(_done_encoding) return d diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index b1ec761e..7498a134 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -24,6 +24,8 @@ class RetrieveStatus: def __init__(self): self.timings = {} self.timings["fetch_per_server"] = {} + self.timings["decode"] = 0.0 + self.timings["decrypt"] = 0.0 self.timings["cumulative_verify"] = 0.0 self.problems = {} self.active = True @@ -59,6 +61,10 @@ class RetrieveStatus: if peerid not in self.timings["fetch_per_server"]: self.timings["fetch_per_server"][peerid] = [] self.timings["fetch_per_server"][peerid].append(elapsed) + def accumulate_decode_time(self, elapsed): + self.timings["decode"] += elapsed + def accumulate_decrypt_time(self, elapsed): + self.timings["decrypt"] += elapsed def set_storage_index(self, si): self.storage_index = si def set_helper(self, helper): @@ -153,6 +159,9 @@ class Retrieve: kwargs["facility"] = "tahoe.mutable.retrieve" return log.msg(*args, **kwargs) + def _set_current_status(self, state): + seg = "%d/%d" % (self._current_segment, self._last_segment) + self._status.set_status("segment %s (%s)" % (seg, state)) ################### # IPushProducer @@ -168,7 +177,7 @@ class Retrieve: # fired when the download is unpaused. self._old_status = self._status.get_status() - self._status.set_status("Paused") + self._set_current_status("paused") self._pause_deferred = defer.Deferred() @@ -806,8 +815,9 @@ class Retrieve: # validate this block, then generate the block hash root. self.log("validating share %d for segment %d" % (reader.shnum, segnum)) - self._status.add_fetch_timing(reader.peerid, started) - self._status.set_status("Valdiating blocks for segment %d" % segnum) + elapsed = time.time() - started + self._status.add_fetch_timing(reader.peerid, elapsed) + self._set_current_status("validating blocks") # Did we fail to fetch either of the things that we were # supposed to? Fail if so. if not results[0][0] and results[1][0]: @@ -946,7 +956,7 @@ class Retrieve: shareids.append(shareid) shares.append(share) - self._status.set_status("Decoding") + self._set_current_status("decoding") started = time.time() assert len(shareids) >= self._required_shares, len(shareids) # zfec really doesn't want extra shares @@ -971,8 +981,7 @@ class Retrieve: size_to_use = self._segment_size segment = segment[:size_to_use] self.log(" segment len=%d" % len(segment)) - self._status.timings.setdefault("decode", 0) - self._status.timings['decode'] = time.time() - started + self._status.accumulate_decode_time(time.time() - started) return segment, salt d.addCallback(_process) return d @@ -984,14 +993,13 @@ class Retrieve: the plaintext of the segment that is in my argument. """ segment, salt = segment_and_salt - self._status.set_status("decrypting") + self._set_current_status("decrypting") self.log("decrypting segment %d" % self._current_segment) started = time.time() key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey()) decryptor = AES(key) plaintext = decryptor.process(segment) - self._status.timings.setdefault("decrypt", 0) - self._status.timings['decrypt'] = time.time() - started + self._status.accumulate_decrypt_time(time.time() - started) return plaintext @@ -1079,6 +1087,8 @@ class Retrieve: now = time.time() self._status.timings['total'] = now - self._started self._status.timings['fetch'] = now - self._started_fetching + self._status.set_status("Finished") + self._status.set_progress(1.0) # remember the encoding parameters, use them again next time (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, @@ -1109,6 +1119,7 @@ class Retrieve: now = time.time() self._status.timings['total'] = now - self._started self._status.timings['fetch'] = now - self._started_fetching + self._status.set_status("Failed") if self._verify: ret = list(self._bad_shares) -- 2.37.2