]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
webish: add publish status
authorBrian Warner <warner@allmydata.com>
Thu, 6 Mar 2008 01:41:10 +0000 (18:41 -0700)
committerBrian Warner <warner@allmydata.com>
Thu, 6 Mar 2008 01:41:10 +0000 (18:41 -0700)
src/allmydata/mutable.py
src/allmydata/web/publish-status.xhtml
src/allmydata/web/retrieve-status.xhtml
src/allmydata/web/status.py

index 0cf7eed4c16a214a6a58eb7a059a4efd92fe9be6..8d861ea6680abafadf22ccafc013ded6d63b8d50 100644 (file)
@@ -248,6 +248,7 @@ class RetrieveStatus:
     def __init__(self):
         self.timings = {}
         self.timings["fetch_per_server"] = {}
+        self.timings["cumulative_verify"] = 0.0
         self.sharemap = {}
         self.problems = {}
         self.active = True
@@ -534,11 +535,18 @@ class Retrieve:
 
         if verinfo not in self._valid_versions:
             # it's a new pair. Verify the signature.
+            started = time.time()
             valid = self._pubkey.verify(prefix, signature)
+            # this records the total verification time for all versions we've
+            # seen. This time is included in "fetch".
+            elapsed = time.time() - started
+            self._status.timings["cumulative_verify"] += elapsed
+
             if not valid:
                 self._status.problems[peerid] = "sh#%d: invalid signature" % shnum
                 raise CorruptShareError(peerid, shnum,
                                         "signature is invalid")
+
             # ok, it's a valid verinfo. Add it to the list of validated
             # versions.
             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
@@ -878,20 +886,34 @@ class PublishStatus:
     statusid_counter = count(0)
     def __init__(self):
         self.timings = {}
-        self.sharemap = None
+        self.timings["per_server"] = {}
+        self.privkey_from = None
+        self.peers_queried = None
+        self.sharemap = None # DictOfSets
+        self.problems = {}
         self.active = True
         self.storage_index = None
         self.helper = False
+        self.encoding = ("?", "?")
+        self.initial_read_size = None
         self.size = None
         self.status = "Not started"
         self.progress = 0.0
         self.counter = self.statusid_counter.next()
         self.started = time.time()
 
+    def add_per_server_time(self, peerid, op, elapsed):
+        assert op in ("read", "write")
+        if peerid not in self.timings["per_server"]:
+            self.timings["per_server"][peerid] = []
+        self.timings["per_server"][peerid].append((op,elapsed))
+
     def get_started(self):
         return self.started
     def get_storage_index(self):
         return self.storage_index
+    def get_encoding(self):
+        return self.encoding
     def using_helper(self):
         return self.helper
     def get_size(self):
@@ -909,6 +931,8 @@ class PublishStatus:
         self.storage_index = si
     def set_helper(self, helper):
         self.helper = helper
+    def set_encoding(self, k, n):
+        self.encoding = (k, n)
     def set_size(self, size):
         self.size = size
     def set_status(self, status):
@@ -932,6 +956,7 @@ class Publish:
         self._status.set_helper(False)
         self._status.set_progress(0.0)
         self._status.set_active(True)
+        self._started = time.time()
 
     def log(self, *args, **kwargs):
         if 'parent' not in kwargs:
@@ -962,7 +987,6 @@ class Publish:
         # 5: when enough responses are back, we're done
 
         self.log("starting publish, datalen is %s" % len(newdata))
-        self._started = time.time()
         self._status.set_size(len(newdata))
 
         self._writekey = self._node.get_writekey()
@@ -978,6 +1002,7 @@ class Publish:
         required_shares = self._node.get_required_shares()
         total_shares = self._node.get_total_shares()
         self._pubkey = self._node.get_pubkey()
+        self._status.set_encoding(required_shares, total_shares)
 
         # these two may not be, we might have to get them from the first peer
         self._privkey = self._node.get_privkey()
@@ -995,10 +1020,13 @@ class Publish:
         # with up to 7 entries, allowing us to make an update in 2 RTT
         # instead of 3.
         self._read_size = 1000
+        self._status.initial_read_size = self._read_size
 
         d = defer.succeed(total_shares)
         d.addCallback(self._query_peers)
+        d.addCallback(self._query_peers_done)
         d.addCallback(self._obtain_privkey)
+        d.addCallback(self._obtain_privkey_done)
 
         d.addCallback(self._encrypt_and_encode, newdata, readkey, IV,
                       required_shares, total_shares)
@@ -1011,6 +1039,9 @@ class Publish:
 
     def _query_peers(self, total_shares):
         self.log("_query_peers")
+        self._query_peers_started = now = time.time()
+        elapsed = now - self._started
+        self._status.timings["setup"] = elapsed
 
         storage_index = self._storage_index
 
@@ -1040,16 +1071,18 @@ class Publish:
         EPSILON = total_shares / 2
         #partial_peerlist = islice(peerlist, total_shares + EPSILON)
         partial_peerlist = peerlist[:total_shares+EPSILON]
+        self._status.peers_queried = len(partial_peerlist)
 
         self._storage_servers = {}
 
+        started = time.time()
         dl = []
         for permutedid, (peerid, ss) in enumerate(partial_peerlist):
             self._storage_servers[peerid] = ss
             d = self._do_query(ss, peerid, storage_index)
             d.addCallback(self._got_query_results,
                           peerid, permutedid,
-                          reachable_peers, current_share_peers)
+                          reachable_peers, current_share_peers, started)
             dl.append(d)
         d = defer.DeferredList(dl)
         d.addCallback(self._got_all_query_results,
@@ -1072,10 +1105,13 @@ class Publish:
         return d
 
     def _got_query_results(self, datavs, peerid, permutedid,
-                           reachable_peers, current_share_peers):
+                           reachable_peers, current_share_peers, started):
 
         lp = self.log(format="_got_query_results from %(peerid)s",
                       peerid=idlib.shortnodeid_b2a(peerid))
+        elapsed = time.time() - started
+        self._status.add_per_server_time(peerid, "read", elapsed)
+
         assert isinstance(datavs, dict)
         reachable_peers[peerid] = permutedid
         if not datavs:
@@ -1164,6 +1200,7 @@ class Publish:
                                total_shares, reachable_peers,
                                current_share_peers):
         self.log("_got_all_query_results")
+
         # now that we know everything about the shares currently out there,
         # decide where to place the new shares.
 
@@ -1230,16 +1267,25 @@ class Publish:
         target_info = (target_map, shares_per_peer)
         return target_info
 
+    def _query_peers_done(self, target_info):
+        self._obtain_privkey_started = now = time.time()
+        elapsed = time.time() - self._query_peers_started
+        self._status.timings["query"] = elapsed
+        return target_info
+
     def _obtain_privkey(self, target_info):
         # make sure we've got a copy of our private key.
         if self._privkey:
             # Must have picked it up during _query_peers. We're good to go.
+            if "privkey_fetch" not in self._status.timings:
+                self._status.timings["privkey_fetch"] = 0.0
             return target_info
 
         # Nope, we haven't managed to grab a copy, and we still need it. Ask
         # peers one at a time until we get a copy. Only bother asking peers
         # who've admitted to holding a share.
 
+        self._privkey_fetch_started = time.time()
         target_map, shares_per_peer = target_info
         # pull shares from self._encprivkey_shares
         if not self._encprivkey_shares:
@@ -1255,25 +1301,44 @@ class Publish:
         return d
 
     def _do_privkey_query(self, rref, peerid, shnum, offset, length):
+        started = time.time()
         d = rref.callRemote("slot_readv", self._storage_index,
                             [shnum], [(offset, length)] )
-        d.addCallback(self._privkey_query_response, peerid, shnum)
+        d.addCallback(self._privkey_query_response, peerid, shnum, started)
         return d
 
-    def _privkey_query_response(self, datav, peerid, shnum):
+    def _privkey_query_response(self, datav, peerid, shnum, started):
+        elapsed = time.time() - started
+        self._status.add_per_server_time(peerid, "read", elapsed)
+
         data = datav[shnum][0]
         self._try_to_validate_privkey(data, peerid, shnum)
 
+        elapsed = time.time() - self._privkey_fetch_started
+        self._status.timings["privkey_fetch"] = elapsed
+        self._status.privkey_from = peerid
+
+    def _obtain_privkey_done(self, target_info):
+        elapsed = time.time() - self._obtain_privkey_started
+        self._status.timings["privkey"] = elapsed
+        return target_info
+
     def _encrypt_and_encode(self, target_info,
                             newdata, readkey, IV,
                             required_shares, total_shares):
         self.log("_encrypt_and_encode")
 
+        started = time.time()
+
         key = hashutil.ssk_readkey_data_hash(IV, readkey)
         enc = AES(key)
         crypttext = enc.process(newdata)
         assert len(crypttext) == len(newdata)
 
+        now = time.time()
+        self._status.timings["encrypt"] = now - started
+        started = now
+
         # now apply FEC
         self.MAX_SEGMENT_SIZE = 1024*1024
         data_length = len(crypttext)
@@ -1298,6 +1363,11 @@ class Publish:
             assert len(piece) == piece_size
 
         d = fec.encode(crypttext_pieces)
+        def _done_encoding(res):
+            elapsed = time.time() - started
+            self._status.timings["encode"] = elapsed
+            return res
+        d.addCallback(_done_encoding)
         d.addCallback(lambda shares_and_shareids:
                       (shares_and_shareids,
                        required_shares, total_shares,
@@ -1311,6 +1381,7 @@ class Publish:
                                 target_info),
                          seqnum, IV):
         self.log("_generate_shares")
+        started = time.time()
 
         # we should know these by now
         privkey = self._privkey
@@ -1356,7 +1427,9 @@ class Publish:
         # then they all share the same encprivkey at the end. The sizes
         # of everything are the same for all shares.
 
+        sign_started = time.time()
         signature = privkey.sign(prefix)
+        self._status.timings["sign"] = time.time() - sign_started
 
         verification_key = pubkey.serialize()
 
@@ -1370,11 +1443,15 @@ class Publish:
                                      all_shares[shnum],
                                      encprivkey)
             final_shares[shnum] = final_share
+        elapsed = time.time() - started
+        self._status.timings["pack"] = elapsed
         return (seqnum, root_hash, final_shares, target_info)
 
 
     def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV):
         self.log("_send_shares")
+        started = time.time()
+
         # we're finally ready to send out our shares. If we encounter any
         # surprises here, it's because somebody else is writing at the same
         # time. (Note: in the future, when we remove the _query_peers() step
@@ -1417,10 +1494,17 @@ class Publish:
             d = self._do_testreadwrite(peerid, secrets,
                                        tw_vectors, read_vector)
             d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
-                          peerid, expected_old_shares[peerid], dispatch_map)
+                          peerid, expected_old_shares[peerid], dispatch_map,
+                          started)
             dl.append(d)
 
         d = defer.DeferredList(dl)
+        def _done_sending(res):
+            elapsed = time.time() - started
+            self._status.timings["push"] = elapsed
+            self._status.sharemap = dispatch_map
+            return res
+        d.addCallback(_done_sending)
         d.addCallback(lambda res: (self._surprised, dispatch_map))
         return d
 
@@ -1438,9 +1522,12 @@ class Publish:
 
     def _got_write_answer(self, answer, tw_vectors, my_checkstring,
                           peerid, expected_old_shares,
-                          dispatch_map):
+                          dispatch_map, started):
         lp = self.log("_got_write_answer from %s" %
                       idlib.shortnodeid_b2a(peerid))
+        elapsed = time.time() - started
+        self._status.add_per_server_time(peerid, "write", elapsed)
+
         wrote, read_data = answer
         surprised = False
 
index f6b862c33230470d2a68ef7149fbfee66d1e2c75..9250371b43081d9051f620617a6c37c070481b88 100644 (file)
   <li>Status: <span n:render="status"/></li>
 </ul>
 
+<h2>Retrieve Results</h2>
+<ul>
+  <li n:render="encoding" />
+  <li n:render="peers_queried" />
+  <li n:render="problems" />
+  <li n:render="sharemap" />
+  <li>Timings:</li>
+  <ul>
+    <li>Total: <span n:render="time" n:data="time_total" />
+    (<span n:render="rate" n:data="rate_total" />)</li>
+    <ul>
+      <li>Setup: <span n:render="time" n:data="time_setup" /></li>
+      <li>Initial Version Query: <span n:render="time" n:data="time_query" />
+      (read size <span n:render="string" n:data="initial_read_size"/> bytes)</li>
+      <li>Obtain Privkey: <span n:render="time" n:data="time_privkey" />
+      <ul>
+        <li>Separate Privkey Fetch: <span n:render="time" n:data="time_privkey_fetch" /> <span n:render="privkey_from"/></li>
+      </ul></li>
+      <li>Encrypting: <span n:render="time" n:data="time_encrypt" />
+      (<span n:render="rate" n:data="rate_encrypt" />)</li>
+      <li>Encoding: <span n:render="time" n:data="time_encode" />
+      (<span n:render="rate" n:data="rate_encode" />)</li>
+      <li>Packing Shares: <span n:render="time" n:data="time_pack" />
+      (<span n:render="rate" n:data="rate_pack" />)
+      <ul>
+        <li>RSA Signature: <span n:render="time" n:data="time_sign" /></li>
+      </ul></li>
+
+      <li>Pushing: <span n:render="time" n:data="time_push" />
+      (<span n:render="rate" n:data="rate_push" />)</li>
+    </ul>
+    <li n:render="server_timings" />
+  </ul>
+</ul>
+
+<div>Return to the <a href="/">Welcome Page</a></div>
+
 </body></html>
index a6371ecd9da584eeaaf66f49e478a6ef91c83133..fe0b9e330727665b8ec5b65de7a7bb031db8fb66 100644 (file)
     <ul>
       <li>Initial Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
       <li>Fetching: <span n:render="time" n:data="time_fetch" />
-      (<span n:render="rate" n:data="rate_fetch" />)</li>
+      (<span n:render="rate" n:data="rate_fetch" />)
+      <ul>
+        <li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li>
+      </ul></li>
       <li>Decoding: <span n:render="time" n:data="time_decode" />
       (<span n:render="rate" n:data="rate_decode" />)</li>
       <li>Decrypting: <span n:render="time" n:data="time_decrypt" />
index 373ab6892398e4caab81845b812bbddb58ed1e57..28f5df37b55c4bae2c1035ba1b4d2c678744ebe7 100644 (file)
@@ -492,6 +492,9 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
     def data_rate_fetch(self, ctx, data):
         return self._get_rate(data, "fetch")
 
+    def data_time_cumulative_verify(self, ctx, data):
+        return self.retrieve_status.timings.get("cumulative_verify")
+
     def data_time_decode(self, ctx, data):
         return self.retrieve_status.timings.get("decode")
     def data_rate_decode(self, ctx, data):
@@ -515,9 +518,13 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
         return T.li["Per-Server Fetch Response Times: ", l]
 
 
-class PublishStatusPage(rend.Page):
+class PublishStatusPage(rend.Page, RateAndTimeMixin):
     docFactory = getxmlfile("publish-status.xhtml")
 
+    def __init__(self, data):
+        rend.Page.__init__(self, data)
+        self.publish_status = data
+
     def render_started(self, ctx, data):
         TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
         started_s = time.strftime(TIME_FORMAT,
@@ -548,6 +555,111 @@ class PublishStatusPage(rend.Page):
     def render_status(self, ctx, data):
         return data.get_status()
 
+    def render_encoding(self, ctx, data):
+        k, n = data.get_encoding()
+        return ctx.tag["Encoding: %s of %s" % (k, n)]
+
+    def render_peers_queried(self, ctx, data):
+        return ctx.tag["Peers Queried: ", data.peers_queried]
+
+    def render_sharemap(self, ctx, data):
+        sharemap = data.sharemap
+        if sharemap is None:
+            return ctx.tag["None"]
+        l = T.ul()
+        for shnum in sorted(sharemap.keys()):
+            l[T.li["%d -> Placed on " % shnum,
+                   ", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
+                              for (peerid,seqnum,root_hash)
+                              in sharemap[shnum]])]]
+        return ctx.tag["Sharemap:", l]
+
+    def render_problems(self, ctx, data):
+        problems = data.problems
+        if not problems:
+            return ""
+        l = T.ul()
+        for peerid in sorted(problems.keys()):
+            peerid_s = idlib.shortnodeid_b2a(peerid)
+            l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]]
+        return ctx.tag["Server Problems:", l]
+
+    def _get_rate(self, data, name):
+        file_size = self.publish_status.get_size()
+        time = self.publish_status.timings.get(name)
+        if time is None:
+            return None
+        try:
+            return 1.0 * file_size / time
+        except ZeroDivisionError:
+            return None
+
+    def data_time_total(self, ctx, data):
+        return self.publish_status.timings.get("total")
+    def data_rate_total(self, ctx, data):
+        return self._get_rate(data, "total")
+
+    def data_time_setup(self, ctx, data):
+        return self.publish_status.timings.get("setup")
+
+    def data_time_query(self, ctx, data):
+        return self.publish_status.timings.get("query")
+
+    def data_time_privkey(self, ctx, data):
+        return self.publish_status.timings.get("privkey")
+
+    def data_time_privkey_fetch(self, ctx, data):
+        return self.publish_status.timings.get("privkey_fetch")
+    def render_privkey_from(self, ctx, data):
+        peerid = data.privkey_from
+        if peerid:
+            return " (got from [%s])" % idlib.shortnodeid_b2a(peerid)
+        else:
+            return ""
+
+    def data_time_encrypt(self, ctx, data):
+        return self.publish_status.timings.get("encrypt")
+    def data_rate_encrypt(self, ctx, data):
+        return self._get_rate(data, "encrypt")
+
+    def data_time_encode(self, ctx, data):
+        return self.publish_status.timings.get("encode")
+    def data_rate_encode(self, ctx, data):
+        return self._get_rate(data, "encode")
+
+    def data_time_pack(self, ctx, data):
+        return self.publish_status.timings.get("pack")
+    def data_rate_pack(self, ctx, data):
+        return self._get_rate(data, "pack")
+    def data_time_sign(self, ctx, data):
+        return self.publish_status.timings.get("sign")
+
+    def data_time_push(self, ctx, data):
+        return self.publish_status.timings.get("push")
+    def data_rate_push(self, ctx, data):
+        return self._get_rate(data, "push")
+
+    def data_initial_read_size(self, ctx, data):
+        return self.publish_status.initial_read_size
+
+    def render_server_timings(self, ctx, data):
+        per_server = self.publish_status.timings.get("per_server")
+        if not per_server:
+            return ""
+        l = T.ul()
+        for peerid in sorted(per_server.keys()):
+            peerid_s = idlib.shortnodeid_b2a(peerid)
+            times = []
+            for op,t in per_server[peerid]:
+                if op == "read":
+                    times.append( "(" + self.render_time(None, t) + ")" )
+                else:
+                    times.append( self.render_time(None, t) )
+            times_s = ", ".join(times)
+            l[T.li["[%s]: %s" % (peerid_s, times_s)]]
+        return T.li["Per-Server Response Times: ", l]
+
+
 class Status(rend.Page):
     docFactory = getxmlfile("status.xhtml")
     addSlash = True