From ec23da28a6a8daf6c6bd61a86bc0bfc8d42063e7 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@allmydata.com>
Date: Tue, 4 Mar 2008 21:04:36 -0700
Subject: [PATCH] webish: add more mutable-retrieve timing status

---
 src/allmydata/mutable.py                |  59 ++++++++-
 src/allmydata/web/retrieve-status.xhtml |  24 ++++
 src/allmydata/webish.py                 | 162 ++++++++++++++++--------
 3 files changed, 185 insertions(+), 60 deletions(-)

diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py
index f983d3b7..49569852 100644
--- a/src/allmydata/mutable.py
+++ b/src/allmydata/mutable.py
@@ -203,10 +203,14 @@ class RetrieveStatus:
     statusid_counter = count(0)
     def __init__(self):
         self.timings = {}
-        self.sharemap = None
+        self.timings["fetch_per_server"] = {}
+        self.sharemap = {}
+        self.problems = {}
         self.active = True
         self.storage_index = None
         self.helper = False
+        self.encoding = ("?","?")
+        self.search_distance = None
         self.size = None
         self.status = "Not started"
         self.progress = 0.0
@@ -217,6 +221,10 @@ class RetrieveStatus:
         return self.started
     def get_storage_index(self):
         return self.storage_index
+    def get_encoding(self):
+        return self.encoding
+    def get_search_distance(self):
+        return self.search_distance
     def using_helper(self):
         return self.helper
     def get_size(self):
@@ -234,6 +242,10 @@ class RetrieveStatus:
         self.storage_index = si
     def set_helper(self, helper):
         self.helper = helper
+    def set_encoding(self, k, n):
+        self.encoding = (k, n)
+    def set_search_distance(self, value):
+        self.search_distance = value
     def set_size(self, size):
         self.size = size
     def set_status(self, status):
@@ -346,6 +358,7 @@ class Retrieve:
         # the hashes over and over again.
         self._valid_shares = {}
 
+        self._started = time.time()
         self._done_deferred = defer.Deferred()
 
         d = defer.succeed(initial_query_count)
@@ -359,6 +372,7 @@ class Retrieve:
 
     def _choose_initial_peers(self, numqueries):
         n = self._node
+        started = time.time()
         full_peerlist = n._client.get_permuted_peers("storage",
                                                      self._storage_index)
 
@@ -373,9 +387,13 @@ class Retrieve:
         # we later increase this limit, it may be useful to re-scan the
         # permuted list.
         self._peerlist_limit = numqueries
+        self._status.set_search_distance(len(self._peerlist))
+        elapsed = time.time() - started
+        self._status.timings["peer_selection"] = elapsed
         return self._peerlist
 
     def _send_initial_requests(self, peerlist):
+        self._first_query_sent = time.time()
         self._bad_peerids = set()
         self._running = True
         self._queries_outstanding = set()
@@ -392,9 +410,11 @@ class Retrieve:
         return None
 
     def _do_query(self, ss, peerid, storage_index, readsize):
+        started = time.time()
         self._queries_outstanding.add(peerid)
         d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)])
-        d.addCallback(self._got_results, peerid, readsize, (ss, storage_index))
+        d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
+                      started)
         d.addErrback(self._query_failed, peerid)
         # errors that aren't handled by _query_failed (and errors caused by
         # _query_failed) get logged, but we still want to check for doneness.
@@ -406,12 +426,19 @@ class Retrieve:
         verifier = rsa.create_verifying_key_from_string(pubkey_s)
         return verifier
 
-    def _got_results(self, datavs, peerid, readsize, stuff):
+    def _got_results(self, datavs, peerid, readsize, stuff, started):
+        elapsed = time.time() - started
+        if peerid not in self._status.timings["fetch_per_server"]:
+            self._status.timings["fetch_per_server"][peerid] = []
+        self._status.timings["fetch_per_server"][peerid].append(elapsed)
         self._queries_outstanding.discard(peerid)
         self._used_peers.add(peerid)
         if not self._running:
             return
 
+        if peerid not in self._status.sharemap:
+            self._status.sharemap[peerid] = set()
+
         for shnum,datav in datavs.items():
             data = datav[0]
             try:
@@ -447,16 +474,20 @@ class Retrieve:
             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
             assert len(fingerprint) == 32
             if fingerprint != self._node._fingerprint:
+                self._status.problems[peerid] = "sh#%d: pubkey doesn't match fingerprint" % shnum
                 raise CorruptShareError(peerid, shnum,
                                         "pubkey doesn't match fingerprint")
             self._pubkey = self._deserialize_pubkey(pubkey_s)
             self._node._populate_pubkey(self._pubkey)
 
         verinfo = (seqnum, root_hash, IV, segsize, datalength)
+        self._status.sharemap[peerid].add(verinfo)
+
         if verinfo not in self._valid_versions:
             # it's a new pair. Verify the signature.
             valid = self._pubkey.verify(prefix, signature)
             if not valid:
+                self._status.problems[peerid] = "sh#%d: invalid signature" % shnum
                 raise CorruptShareError(peerid, shnum,
                                         "signature is invalid")
             # ok, it's a valid verinfo. Add it to the list of validated
@@ -486,11 +517,15 @@ class Retrieve:
         # rest of the shares), we need to implement the refactoring mentioned
         # above.
         if k != self._required_shares:
+            self._status.problems[peerid] = "sh#%d: k=%d, we want %d" \
+                                            % (shnum, k, self._required_shares)
             raise CorruptShareError(peerid, shnum,
                                     "share has k=%d, we want k=%d" %
                                     (k, self._required_shares))
 
         if N != self._total_shares:
+            self._status.problems[peerid] = "sh#%d: N=%d, we want %d" \
+                                            % (shnum, N, self._total_shares)
             raise CorruptShareError(peerid, shnum,
                                     "share has N=%d, we want N=%d" %
                                     (N, self._total_shares))
@@ -587,14 +622,19 @@ class Retrieve:
                      level=log.UNUSUAL)
         # are there any peers on the list that we haven't used?
         new_query_peers = []
-        for (peerid, ss) in self._peerlist:
+        peer_indicies = []
+        for i, (peerid, ss) in enumerate(self._peerlist):
             if peerid not in self._used_peers:
                 new_query_peers.append( (peerid, ss) )
+                peer_indicies.append(i)
                 if len(new_query_peers) > 5:
                     # only query in batches of 5. TODO: this is pretty
                     # arbitrary, really I want this to be something like
                     # k - max(known_version_sharecounts) + some extra
                     break
+        new_search_distance = max(max(peer_indicies),
+                                  self._status.get_search_distance())
+        self._status.set_search_distance(new_search_distance)
         if new_query_peers:
             self.log("sending %d new queries (read %d bytes)" %
                      (len(new_query_peers), self._read_size), level=log.UNUSUAL)
@@ -671,6 +711,8 @@ class Retrieve:
         # now that the big loop is done, all shares in the sharemap are
         # valid, and they're all for the same seqnum+root_hash version, so
         # it's now down to doing FEC and decrypt.
+        elapsed = time.time() - self._started
+        self._status.timings["fetch"] = elapsed
         assert len(shares) >= self._required_shares, len(shares)
         d = defer.maybeDeferred(self._decode, shares, segsize, datalength)
         d.addCallback(self._decrypt, IV, seqnum, root_hash)
@@ -728,8 +770,12 @@ class Retrieve:
 
         self.log("params %s, we have %d shares" % (params, len(shares)))
         self.log("about to decode, shareids=%s" % (shareids,))
+        started = time.time()
         d = defer.maybeDeferred(fec.decode, shares, shareids)
         def _done(buffers):
+            elapsed = time.time() - started
+            self._status.timings["decode"] = elapsed
+            self._status.set_encoding(self._required_shares, self._total_shares)
             self.log(" decode done, %d buffers" % len(buffers))
             segment = "".join(buffers)
             self.log(" joined length %d, datalength %d" %
@@ -745,9 +791,12 @@ class Retrieve:
         return d
 
     def _decrypt(self, crypttext, IV, seqnum, root_hash):
+        started = time.time()
         key = hashutil.ssk_readkey_data_hash(IV, self._readkey)
         decryptor = AES(key)
         plaintext = decryptor.process(crypttext)
+        elapsed = time.time() - started
+        self._status.timings["decrypt"] = elapsed
         # it worked, so record the seqnum and root_hash for next time
         self._node._populate_seqnum(seqnum)
         self._node._populate_root_hash(root_hash)
@@ -760,6 +809,8 @@ class Retrieve:
         self._status.set_status("Done")
         self._status.set_progress(1.0)
         self._status.set_size(len(contents))
+        elapsed = time.time() - self._started
+        self._status.timings["total"] = elapsed
         eventually(self._done_deferred.callback, contents)
 
     def get_status(self):
diff --git a/src/allmydata/web/retrieve-status.xhtml b/src/allmydata/web/retrieve-status.xhtml
index 422cdcd8..a6371ecd 100644
--- a/src/allmydata/web/retrieve-status.xhtml
+++ b/src/allmydata/web/retrieve-status.xhtml
@@ -19,4 +19,28 @@
   <li>Status: <span n:render="status"/></li>
 </ul>
 
+<h2>Retrieve Results</h2>
+<ul>
+  <li n:render="encoding" />
+  <li n:render="search_distance" />
+  <li n:render="problems" />
+  <li>Timings:</li>
+  <ul>
+    <li>Total: <span n:render="time" n:data="time_total" />
+    (<span n:render="rate" n:data="rate_total" />)</li>
+    <ul>
+      <li>Initial Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
+      <li>Fetching: <span n:render="time" n:data="time_fetch" />
+      (<span n:render="rate" n:data="rate_fetch" />)</li>
+      <li>Decoding: <span n:render="time" n:data="time_decode" />
+      (<span n:render="rate" n:data="rate_decode" />)</li>
+      <li>Decrypting: <span n:render="time" n:data="time_decrypt" />
+      (<span n:render="rate" n:data="rate_decrypt" />)</li>
+    </ul>
+    <li n:render="server_timings" />
+  </ul>
+</ul>
+
+<div>Return to the <a href="/">Welcome Page</a></div>
+
 </body></html>
diff --git a/src/allmydata/webish.py b/src/allmydata/webish.py
index bae3d7d1..1cab77a3 100644
--- a/src/allmydata/webish.py
+++ b/src/allmydata/webish.py
@@ -1373,12 +1373,42 @@ class UnlinkedPUTCreateDirectory(rend.Page):
         # XXX add redirect_to_result
         return d
 
-def plural(sequence):
-    if len(sequence) == 1:
+def plural(sequence_or_length):
+    if isinstance(sequence_or_length, int):
+        length = sequence_or_length
+    else:
+        length = len(sequence_or_length)
+    if length == 1:
         return ""
     return "s"
 
-class UploadResultsRendererMixin:
+class RateAndTimeMixin:
+
+    def render_time(self, ctx, data):
+        # 1.23s, 790ms, 132us
+        if data is None:
+            return ""
+        s = float(data)
+        if s >= 1.0:
+            return "%.2fs" % s
+        if s >= 0.01:
+            return "%dms" % (1000*s)
+        if s >= 0.001:
+            return "%.1fms" % (1000*s)
+        return "%dus" % (1000000*s)
+
+    def render_rate(self, ctx, data):
+        # 21.8kBps, 554.4kBps 4.37MBps
+        if data is None:
+            return ""
+        r = float(data)
+        if r > 1000000:
+            return "%1.2fMBps" % (r/1000000)
+        if r > 1000:
+            return "%.1fkBps" % (r/1000)
+        return "%dBps" % r
+
+class UploadResultsRendererMixin(RateAndTimeMixin):
     # this requires a method named 'upload_results'
 
     def render_sharemap(self, ctx, data):
@@ -1417,30 +1447,6 @@ class UploadResultsRendererMixin:
         d.addCallback(lambda res: res.file_size)
         return d
 
-    def render_time(self, ctx, data):
-        # 1.23s, 790ms, 132us
-        if data is None:
-            return ""
-        s = float(data)
-        if s >= 1.0:
-            return "%.2fs" % s
-        if s >= 0.01:
-            return "%dms" % (1000*s)
-        if s >= 0.001:
-            return "%.1fms" % (1000*s)
-        return "%dus" % (1000000*s)
-
-    def render_rate(self, ctx, data):
-        # 21.8kBps, 554.4kBps 4.37MBps
-        if data is None:
-            return ""
-        r = float(data)
-        if r > 1000000:
-            return "%1.2fMBps" % (r/1000000)
-        if r > 1000:
-            return "%.1fkBps" % (r/1000)
-        return "%dBps" % r
-
     def _get_time(self, name):
         d = self.upload_results()
         d.addCallback(lambda res: res.timings.get(name))
@@ -1678,7 +1684,7 @@ class UploadStatusPage(UploadResultsRendererMixin, rend.Page):
     def render_status(self, ctx, data):
         return data.get_status()
 
-class DownloadResultsRendererMixin:
+class DownloadResultsRendererMixin(RateAndTimeMixin):
     # this requires a method named 'download_results'
 
     def render_servermap(self, ctx, data):
@@ -1730,30 +1736,6 @@ class DownloadResultsRendererMixin:
         d.addCallback(lambda res: res.file_size)
         return d
 
-    def render_time(self, ctx, data):
-        # 1.23s, 790ms, 132us
-        if data is None:
-            return ""
-        s = float(data)
-        if s >= 1.0:
-            return "%.2fs" % s
-        if s >= 0.01:
-            return "%dms" % (1000*s)
-        if s >= 0.001:
-            return "%.1fms" % (1000*s)
-        return "%dus" % (1000000*s)
-
-    def render_rate(self, ctx, data):
-        # 21.8kBps, 554.4kBps 4.37MBps
-        if data is None:
-            return ""
-        r = float(data)
-        if r > 1000000:
-            return "%1.2fMBps" % (r/1000000)
-        if r > 1000:
-            return "%.1fkBps" % (r/1000)
-        return "%dBps" % r
-
     def _get_time(self, name):
         d = self.download_results()
         d.addCallback(lambda res: res.timings.get(name))
@@ -1877,9 +1859,13 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
     def render_status(self, ctx, data):
         return data.get_status()
 
-class RetrieveStatusPage(rend.Page):
+class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
     docFactory = getxmlfile("retrieve-status.xhtml")
 
+    def __init__(self, data):
+        rend.Page.__init__(self, data)
+        self.retrieve_status = data
+
     def render_started(self, ctx, data):
         TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
         started_s = time.strftime(TIME_FORMAT,
@@ -1910,6 +1896,70 @@ class RetrieveStatusPage(rend.Page):
     def render_status(self, ctx, data):
         return data.get_status()
 
+    def render_encoding(self, ctx, data):
+        k, n = data.get_encoding()
+        return ctx.tag["Encoding: %s of %s" % (k, n)]
+
+    def render_search_distance(self, ctx, data):
+        d = data.get_search_distance()
+        return ctx.tag["Search Distance: %s peer%s" % (d, plural(d))]
+
+    def render_problems(self, ctx, data):
+        problems = data.problems
+        if not problems:
+            return ""
+        l = T.ul()
+        for peerid in sorted(problems.keys()):
+            peerid_s = idlib.shortnodeid_b2a(peerid)
+            l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]]
+        return ctx.tag["Server Problems:", l]
+
+    def _get_rate(self, data, name):
+        file_size = self.retrieve_status.get_size()
+        time = self.retrieve_status.timings.get(name)
+        if time is None:
+            return None
+        try:
+            return 1.0 * file_size / time
+        except ZeroDivisionError:
+            return None
+
+    def data_time_total(self, ctx, data):
+        return self.retrieve_status.timings.get("total")
+    def data_rate_total(self, ctx, data):
+        return self._get_rate(data, "total")
+
+    def data_time_peer_selection(self, ctx, data):
+        return self.retrieve_status.timings.get("peer_selection")
+
+    def data_time_fetch(self, ctx, data):
+        return self.retrieve_status.timings.get("fetch")
+    def data_rate_fetch(self, ctx, data):
+        return self._get_rate(data, "fetch")
+
+    def data_time_decode(self, ctx, data):
+        return self.retrieve_status.timings.get("decode")
+    def data_rate_decode(self, ctx, data):
+        return self._get_rate(data, "decode")
+
+    def data_time_decrypt(self, ctx, data):
+        return self.retrieve_status.timings.get("decrypt")
+    def data_rate_decrypt(self, ctx, data):
+        return self._get_rate(data, "decrypt")
+
+    def render_server_timings(self, ctx, data):
+        per_server = self.retrieve_status.timings.get("fetch_per_server")
+        if not per_server:
+            return ""
+        l = T.ul()
+        for peerid in sorted(per_server.keys()):
+            peerid_s = idlib.shortnodeid_b2a(peerid)
+            times_s = ", ".join([self.render_time(None, t)
+                                 for t in per_server[peerid]])
+            l[T.li["[%s]: %s" % (peerid_s, times_s)]]
+        return T.li["Per-Server Fetch Response Times: ", l]
+
+
 class PublishStatusPage(rend.Page):
     docFactory = getxmlfile("publish-status.xhtml")
 
@@ -1956,9 +2006,9 @@ class Status(rend.Page):
 
     def data_recent_operations(self, ctx, data):
         recent = [o for o in (IClient(ctx).list_recent_uploads() +
-                            IClient(ctx).list_recent_downloads() +
-                            IClient(ctx).list_recent_publish() +
-                            IClient(ctx).list_recent_retrieve())
+                              IClient(ctx).list_recent_downloads() +
+                              IClient(ctx).list_recent_publish() +
+                              IClient(ctx).list_recent_retrieve())
                   if not o.get_active()]
         recent.sort(lambda a,b: cmp(a.get_started(), b.get_started()))
         recent.reverse()
-- 
2.45.2