]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable.py
webish: add publish status
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable.py
index 0cf7eed4c16a214a6a58eb7a059a4efd92fe9be6..8d861ea6680abafadf22ccafc013ded6d63b8d50 100644 (file)
@@ -248,6 +248,7 @@ class RetrieveStatus:
     def __init__(self):
         self.timings = {}
         self.timings["fetch_per_server"] = {}
+        self.timings["cumulative_verify"] = 0.0
         self.sharemap = {}
         self.problems = {}
         self.active = True
@@ -534,11 +535,18 @@ class Retrieve:
 
         if verinfo not in self._valid_versions:
             # it's a new pair. Verify the signature.
+            started = time.time()
             valid = self._pubkey.verify(prefix, signature)
+            # this records the total verification time for all versions we've
+            # seen. This time is included in "fetch".
+            elapsed = time.time() - started
+            self._status.timings["cumulative_verify"] += elapsed
+
             if not valid:
                 self._status.problems[peerid] = "sh#%d: invalid signature" % shnum
                 raise CorruptShareError(peerid, shnum,
                                         "signature is invalid")
+
             # ok, it's a valid verinfo. Add it to the list of validated
             # versions.
             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
@@ -878,20 +886,34 @@ class PublishStatus:
     statusid_counter = count(0)
     def __init__(self):
         self.timings = {}
-        self.sharemap = None
+        self.timings["per_server"] = {}
+        self.privkey_from = None
+        self.peers_queried = None
+        self.sharemap = None # DictOfSets
+        self.problems = {}
         self.active = True
         self.storage_index = None
         self.helper = False
+        self.encoding = ("?", "?")
+        self.initial_read_size = None
         self.size = None
         self.status = "Not started"
         self.progress = 0.0
         self.counter = self.statusid_counter.next()
         self.started = time.time()
 
+    def add_per_server_time(self, peerid, op, elapsed):
+        assert op in ("read", "write")
+        if peerid not in self.timings["per_server"]:
+            self.timings["per_server"][peerid] = []
+        self.timings["per_server"][peerid].append((op,elapsed))
+
     def get_started(self):
         return self.started
     def get_storage_index(self):
         return self.storage_index
+    def get_encoding(self):
+        return self.encoding
     def using_helper(self):
         return self.helper
     def get_size(self):
@@ -909,6 +931,8 @@ class PublishStatus:
         self.storage_index = si
     def set_helper(self, helper):
         self.helper = helper
+    def set_encoding(self, k, n):
+        self.encoding = (k, n)
     def set_size(self, size):
         self.size = size
     def set_status(self, status):
@@ -932,6 +956,7 @@ class Publish:
         self._status.set_helper(False)
         self._status.set_progress(0.0)
         self._status.set_active(True)
+        self._started = time.time()
 
     def log(self, *args, **kwargs):
         if 'parent' not in kwargs:
@@ -962,7 +987,6 @@ class Publish:
         # 5: when enough responses are back, we're done
 
         self.log("starting publish, datalen is %s" % len(newdata))
-        self._started = time.time()
         self._status.set_size(len(newdata))
 
         self._writekey = self._node.get_writekey()
@@ -978,6 +1002,7 @@ class Publish:
         required_shares = self._node.get_required_shares()
         total_shares = self._node.get_total_shares()
         self._pubkey = self._node.get_pubkey()
+        self._status.set_encoding(required_shares, total_shares)
 
         # these two may not be, we might have to get them from the first peer
         self._privkey = self._node.get_privkey()
@@ -995,10 +1020,13 @@ class Publish:
         # with up to 7 entries, allowing us to make an update in 2 RTT
         # instead of 3.
         self._read_size = 1000
+        self._status.initial_read_size = self._read_size
 
         d = defer.succeed(total_shares)
         d.addCallback(self._query_peers)
+        d.addCallback(self._query_peers_done)
         d.addCallback(self._obtain_privkey)
+        d.addCallback(self._obtain_privkey_done)
 
         d.addCallback(self._encrypt_and_encode, newdata, readkey, IV,
                       required_shares, total_shares)
@@ -1011,6 +1039,9 @@ class Publish:
 
     def _query_peers(self, total_shares):
         self.log("_query_peers")
+        self._query_peers_started = now = time.time()
+        elapsed = now - self._started
+        self._status.timings["setup"] = elapsed
 
         storage_index = self._storage_index
 
@@ -1040,16 +1071,18 @@ class Publish:
         EPSILON = total_shares / 2
         #partial_peerlist = islice(peerlist, total_shares + EPSILON)
         partial_peerlist = peerlist[:total_shares+EPSILON]
+        self._status.peers_queried = len(partial_peerlist)
 
         self._storage_servers = {}
 
+        started = time.time()
         dl = []
         for permutedid, (peerid, ss) in enumerate(partial_peerlist):
             self._storage_servers[peerid] = ss
             d = self._do_query(ss, peerid, storage_index)
             d.addCallback(self._got_query_results,
                           peerid, permutedid,
-                          reachable_peers, current_share_peers)
+                          reachable_peers, current_share_peers, started)
             dl.append(d)
         d = defer.DeferredList(dl)
         d.addCallback(self._got_all_query_results,
@@ -1072,10 +1105,13 @@ class Publish:
         return d
 
     def _got_query_results(self, datavs, peerid, permutedid,
-                           reachable_peers, current_share_peers):
+                           reachable_peers, current_share_peers, started):
 
         lp = self.log(format="_got_query_results from %(peerid)s",
                       peerid=idlib.shortnodeid_b2a(peerid))
+        elapsed = time.time() - started
+        self._status.add_per_server_time(peerid, "read", elapsed)
+
         assert isinstance(datavs, dict)
         reachable_peers[peerid] = permutedid
         if not datavs:
@@ -1164,6 +1200,7 @@ class Publish:
                                total_shares, reachable_peers,
                                current_share_peers):
         self.log("_got_all_query_results")
+
         # now that we know everything about the shares currently out there,
         # decide where to place the new shares.
 
@@ -1230,16 +1267,25 @@ class Publish:
         target_info = (target_map, shares_per_peer)
         return target_info
 
+    def _query_peers_done(self, target_info):
+        self._obtain_privkey_started = now = time.time()
+        elapsed = time.time() - self._query_peers_started
+        self._status.timings["query"] = elapsed
+        return target_info
+
     def _obtain_privkey(self, target_info):
         # make sure we've got a copy of our private key.
         if self._privkey:
             # Must have picked it up during _query_peers. We're good to go.
+            if "privkey_fetch" not in self._status.timings:
+                self._status.timings["privkey_fetch"] = 0.0
             return target_info
 
         # Nope, we haven't managed to grab a copy, and we still need it. Ask
         # peers one at a time until we get a copy. Only bother asking peers
         # who've admitted to holding a share.
 
+        self._privkey_fetch_started = time.time()
         target_map, shares_per_peer = target_info
         # pull shares from self._encprivkey_shares
         if not self._encprivkey_shares:
@@ -1255,25 +1301,44 @@ class Publish:
         return d
 
     def _do_privkey_query(self, rref, peerid, shnum, offset, length):
+        started = time.time()
         d = rref.callRemote("slot_readv", self._storage_index,
                             [shnum], [(offset, length)] )
-        d.addCallback(self._privkey_query_response, peerid, shnum)
+        d.addCallback(self._privkey_query_response, peerid, shnum, started)
         return d
 
-    def _privkey_query_response(self, datav, peerid, shnum):
+    def _privkey_query_response(self, datav, peerid, shnum, started):
+        elapsed = time.time() - started
+        self._status.add_per_server_time(peerid, "read", elapsed)
+
         data = datav[shnum][0]
         self._try_to_validate_privkey(data, peerid, shnum)
 
+        elapsed = time.time() - self._privkey_fetch_started
+        self._status.timings["privkey_fetch"] = elapsed
+        self._status.privkey_from = peerid
+
+    def _obtain_privkey_done(self, target_info):
+        elapsed = time.time() - self._obtain_privkey_started
+        self._status.timings["privkey"] = elapsed
+        return target_info
+
     def _encrypt_and_encode(self, target_info,
                             newdata, readkey, IV,
                             required_shares, total_shares):
         self.log("_encrypt_and_encode")
 
+        started = time.time()
+
         key = hashutil.ssk_readkey_data_hash(IV, readkey)
         enc = AES(key)
         crypttext = enc.process(newdata)
         assert len(crypttext) == len(newdata)
 
+        now = time.time()
+        self._status.timings["encrypt"] = now - started
+        started = now
+
         # now apply FEC
         self.MAX_SEGMENT_SIZE = 1024*1024
         data_length = len(crypttext)
@@ -1298,6 +1363,11 @@ class Publish:
             assert len(piece) == piece_size
 
         d = fec.encode(crypttext_pieces)
+        def _done_encoding(res):
+            elapsed = time.time() - started
+            self._status.timings["encode"] = elapsed
+            return res
+        d.addCallback(_done_encoding)
         d.addCallback(lambda shares_and_shareids:
                       (shares_and_shareids,
                        required_shares, total_shares,
@@ -1311,6 +1381,7 @@ class Publish:
                                 target_info),
                          seqnum, IV):
         self.log("_generate_shares")
+        started = time.time()
 
         # we should know these by now
         privkey = self._privkey
@@ -1356,7 +1427,9 @@ class Publish:
         # then they all share the same encprivkey at the end. The sizes
         # of everything are the same for all shares.
 
+        sign_started = time.time()
         signature = privkey.sign(prefix)
+        self._status.timings["sign"] = time.time() - sign_started
 
         verification_key = pubkey.serialize()
 
@@ -1370,11 +1443,15 @@ class Publish:
                                      all_shares[shnum],
                                      encprivkey)
             final_shares[shnum] = final_share
+        elapsed = time.time() - started
+        self._status.timings["pack"] = elapsed
         return (seqnum, root_hash, final_shares, target_info)
 
 
     def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV):
         self.log("_send_shares")
+        started = time.time()
+
         # we're finally ready to send out our shares. If we encounter any
         # surprises here, it's because somebody else is writing at the same
         # time. (Note: in the future, when we remove the _query_peers() step
@@ -1417,10 +1494,17 @@ class Publish:
             d = self._do_testreadwrite(peerid, secrets,
                                        tw_vectors, read_vector)
             d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
-                          peerid, expected_old_shares[peerid], dispatch_map)
+                          peerid, expected_old_shares[peerid], dispatch_map,
+                          started)
             dl.append(d)
 
         d = defer.DeferredList(dl)
+        def _done_sending(res):
+            elapsed = time.time() - started
+            self._status.timings["push"] = elapsed
+            self._status.sharemap = dispatch_map
+            return res
+        d.addCallback(_done_sending)
         d.addCallback(lambda res: (self._surprised, dispatch_map))
         return d
 
@@ -1438,9 +1522,12 @@ class Publish:
 
     def _got_write_answer(self, answer, tw_vectors, my_checkstring,
                           peerid, expected_old_shares,
-                          dispatch_map):
+                          dispatch_map, started):
         lp = self.log("_got_write_answer from %s" %
                       idlib.shortnodeid_b2a(peerid))
+        elapsed = time.time() - started
+        self._status.add_per_server_time(peerid, "write", elapsed)
+
         wrote, read_data = answer
         surprised = False