From 9756146d616735522f38e3cdc8b62e234820d467 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Sun, 28 Aug 2011 16:22:21 -0700
Subject: [PATCH] Fix mutable publish/retrieve timing status displays. Fixes
 #1505.

publish:
* encrypt and encode times are cumulative, not just current-segment

retrieve:
* same for decrypt and decode times
* update "current status" to include segment number
* set status to Finished/Failed when download is complete
* set progress to 1.0 when complete

More improvements to consider:
* progress is currently 0% or 100%: should calculate how many segments are
  involved (remembering retrieve can be less than the whole file) and set it
  to a fraction
* "fetch" time is fuzzy: what we want is to know how much of the delay is not
  our own fault, but since we do decode/decrypt work while waiting for more
  shares, it's not straightforward
---
 src/allmydata/mutable/publish.py  | 10 ++++++++--
 src/allmydata/mutable/retrieve.py | 29 ++++++++++++++++++++---------
 2 files changed, 28 insertions(+), 11 deletions(-)

diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py
index 066ee904..e74be2ec 100644
--- a/src/allmydata/mutable/publish.py
+++ b/src/allmydata/mutable/publish.py
@@ -33,6 +33,8 @@ class PublishStatus:
     def __init__(self):
         self.timings = {}
         self.timings["send_per_server"] = {}
+        self.timings["encrypt"] = 0.0
+        self.timings["encode"] = 0.0
         self.servermap = None
         self.problems = {}
         self.active = True
@@ -49,6 +51,10 @@ class PublishStatus:
         if peerid not in self.timings["send_per_server"]:
             self.timings["send_per_server"][peerid] = []
         self.timings["send_per_server"][peerid].append(elapsed)
+    def accumulate_encode_time(self, elapsed):
+        self.timings["encode"] += elapsed
+    def accumulate_encrypt_time(self, elapsed):
+        self.timings["encrypt"] += elapsed
 
     def get_started(self):
         return self.started
@@ -711,7 +717,7 @@ class Publish:
         assert len(crypttext) == len(data)
 
         now = time.time()
-        self._status.timings["encrypt"] = now - started
+        self._status.accumulate_encrypt_time(now - started)
         started = now
 
         # now apply FEC
@@ -732,7 +738,7 @@ class Publish:
         d = fec.encode(crypttext_pieces)
         def _done_encoding(res):
             elapsed = time.time() - started
-            self._status.timings["encode"] = elapsed
+            self._status.accumulate_encode_time(elapsed)
             return (res, salt)
         d.addCallback(_done_encoding)
         return d
diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py
index b1ec761e..7498a134 100644
--- a/src/allmydata/mutable/retrieve.py
+++ b/src/allmydata/mutable/retrieve.py
@@ -24,6 +24,8 @@ class RetrieveStatus:
     def __init__(self):
         self.timings = {}
         self.timings["fetch_per_server"] = {}
+        self.timings["decode"] = 0.0
+        self.timings["decrypt"] = 0.0
         self.timings["cumulative_verify"] = 0.0
         self.problems = {}
         self.active = True
@@ -59,6 +61,10 @@ class RetrieveStatus:
         if peerid not in self.timings["fetch_per_server"]:
             self.timings["fetch_per_server"][peerid] = []
         self.timings["fetch_per_server"][peerid].append(elapsed)
+    def accumulate_decode_time(self, elapsed):
+        self.timings["decode"] += elapsed
+    def accumulate_decrypt_time(self, elapsed):
+        self.timings["decrypt"] += elapsed
     def set_storage_index(self, si):
         self.storage_index = si
     def set_helper(self, helper):
@@ -153,6 +159,9 @@ class Retrieve:
             kwargs["facility"] = "tahoe.mutable.retrieve"
         return log.msg(*args, **kwargs)
 
+    def _set_current_status(self, state):
+        seg = "%d/%d" % (self._current_segment, self._last_segment)
+        self._status.set_status("segment %s (%s)" % (seg, state))
 
     ###################
     # IPushProducer
@@ -168,7 +177,7 @@ class Retrieve:
 
         # fired when the download is unpaused. 
         self._old_status = self._status.get_status()
-        self._status.set_status("Paused")
+        self._set_current_status("paused")
 
         self._pause_deferred = defer.Deferred()
 
@@ -806,8 +815,9 @@ class Retrieve:
         # validate this block, then generate the block hash root.
         self.log("validating share %d for segment %d" % (reader.shnum,
                                                              segnum))
-        self._status.add_fetch_timing(reader.peerid, started)
-        self._status.set_status("Valdiating blocks for segment %d" % segnum)
+        elapsed = time.time() - started
+        self._status.add_fetch_timing(reader.peerid, elapsed)
+        self._set_current_status("validating blocks")
         # Did we fail to fetch either of the things that we were
         # supposed to? Fail if so.
         if not results[0][0] and results[1][0]:
@@ -946,7 +956,7 @@ class Retrieve:
             shareids.append(shareid)
             shares.append(share)
 
-        self._status.set_status("Decoding")
+        self._set_current_status("decoding")
         started = time.time()
         assert len(shareids) >= self._required_shares, len(shareids)
         # zfec really doesn't want extra shares
@@ -971,8 +981,7 @@ class Retrieve:
                 size_to_use = self._segment_size
             segment = segment[:size_to_use]
             self.log(" segment len=%d" % len(segment))
-            self._status.timings.setdefault("decode", 0)
-            self._status.timings['decode'] = time.time() - started
+            self._status.accumulate_decode_time(time.time() - started)
             return segment, salt
         d.addCallback(_process)
         return d
@@ -984,14 +993,13 @@ class Retrieve:
         the plaintext of the segment that is in my argument.
         """
         segment, salt = segment_and_salt
-        self._status.set_status("decrypting")
+        self._set_current_status("decrypting")
         self.log("decrypting segment %d" % self._current_segment)
         started = time.time()
         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
         decryptor = AES(key)
         plaintext = decryptor.process(segment)
-        self._status.timings.setdefault("decrypt", 0)
-        self._status.timings['decrypt'] = time.time() - started
+        self._status.accumulate_decrypt_time(time.time() - started)
         return plaintext
 
 
@@ -1079,6 +1087,8 @@ class Retrieve:
         now = time.time()
         self._status.timings['total'] = now - self._started
         self._status.timings['fetch'] = now - self._started_fetching
+        self._status.set_status("Finished")
+        self._status.set_progress(1.0)
 
         # remember the encoding parameters, use them again next time
         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
@@ -1109,6 +1119,7 @@ class Retrieve:
         now = time.time()
         self._status.timings['total'] = now - self._started
         self._status.timings['fetch'] = now - self._started_fetching
+        self._status.set_status("Failed")
 
         if self._verify:
             ret = list(self._bad_shares)
-- 
2.45.2