From: Brian Warner Date: Thu, 6 Mar 2008 01:41:10 +0000 (-0700) Subject: webish: add publish status X-Git-Tag: allmydata-tahoe-0.9.0~61 X-Git-Url: https://git.rkrishnan.org/pf/content/en/service/contact.html?a=commitdiff_plain;h=dd4a95177095c3d299f9ada7ef89cd41b9d16450;p=tahoe-lafs%2Ftahoe-lafs.git webish: add publish status --- diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index 0cf7eed4..8d861ea6 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -248,6 +248,7 @@ class RetrieveStatus: def __init__(self): self.timings = {} self.timings["fetch_per_server"] = {} + self.timings["cumulative_verify"] = 0.0 self.sharemap = {} self.problems = {} self.active = True @@ -534,11 +535,18 @@ class Retrieve: if verinfo not in self._valid_versions: # it's a new pair. Verify the signature. + started = time.time() valid = self._pubkey.verify(prefix, signature) + # this records the total verification time for all versions we've + # seen. This time is included in "fetch". + elapsed = time.time() - started + self._status.timings["cumulative_verify"] += elapsed + if not valid: self._status.problems[peerid] = "sh#%d: invalid signature" % shnum raise CorruptShareError(peerid, shnum, "signature is invalid") + # ok, it's a valid verinfo. Add it to the list of validated # versions. self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d" @@ -878,20 +886,34 @@ class PublishStatus: statusid_counter = count(0) def __init__(self): self.timings = {} - self.sharemap = None + self.timings["per_server"] = {} + self.privkey_from = None + self.peers_queried = None + self.sharemap = None # DictOfSets + self.problems = {} self.active = True self.storage_index = None self.helper = False + self.encoding = ("?", "?") + self.initial_read_size = None self.size = None self.status = "Not started" self.progress = 0.0 self.counter = self.statusid_counter.next() self.started = time.time() + def add_per_server_time(self, peerid, op, elapsed): + assert op in ("read", "write") + if peerid not in self.timings["per_server"]: + self.timings["per_server"][peerid] = [] + self.timings["per_server"][peerid].append((op,elapsed)) + def get_started(self): return self.started def get_storage_index(self): return self.storage_index + def get_encoding(self): + return self.encoding def using_helper(self): return self.helper def get_size(self): @@ -909,6 +931,8 @@ class PublishStatus: self.storage_index = si def set_helper(self, helper): self.helper = helper + def set_encoding(self, k, n): + self.encoding = (k, n) def set_size(self, size): self.size = size def set_status(self, status): @@ -932,6 +956,7 @@ class Publish: self._status.set_helper(False) self._status.set_progress(0.0) self._status.set_active(True) + self._started = time.time() def log(self, *args, **kwargs): if 'parent' not in kwargs: @@ -962,7 +987,6 @@ class Publish: # 5: when enough responses are back, we're done self.log("starting publish, datalen is %s" % len(newdata)) - self._started = time.time() self._status.set_size(len(newdata)) self._writekey = self._node.get_writekey() @@ -978,6 +1002,7 @@ class Publish: required_shares = self._node.get_required_shares() total_shares = self._node.get_total_shares() self._pubkey = self._node.get_pubkey() + self._status.set_encoding(required_shares, total_shares) # these two may not be, we might have to get them from the first peer self._privkey = self._node.get_privkey() @@ -995,10 +1020,13 @@ class Publish: # with up to 7 entries, allowing us to make an update in 2 RTT # instead of 3. self._read_size = 1000 + self._status.initial_read_size = self._read_size d = defer.succeed(total_shares) d.addCallback(self._query_peers) + d.addCallback(self._query_peers_done) d.addCallback(self._obtain_privkey) + d.addCallback(self._obtain_privkey_done) d.addCallback(self._encrypt_and_encode, newdata, readkey, IV, required_shares, total_shares) @@ -1011,6 +1039,9 @@ class Publish: def _query_peers(self, total_shares): self.log("_query_peers") + self._query_peers_started = now = time.time() + elapsed = now - self._started + self._status.timings["setup"] = elapsed storage_index = self._storage_index @@ -1040,16 +1071,18 @@ class Publish: EPSILON = total_shares / 2 #partial_peerlist = islice(peerlist, total_shares + EPSILON) partial_peerlist = peerlist[:total_shares+EPSILON] + self._status.peers_queried = len(partial_peerlist) self._storage_servers = {} + started = time.time() dl = [] for permutedid, (peerid, ss) in enumerate(partial_peerlist): self._storage_servers[peerid] = ss d = self._do_query(ss, peerid, storage_index) d.addCallback(self._got_query_results, peerid, permutedid, - reachable_peers, current_share_peers) + reachable_peers, current_share_peers, started) dl.append(d) d = defer.DeferredList(dl) d.addCallback(self._got_all_query_results, @@ -1072,10 +1105,13 @@ class Publish: return d def _got_query_results(self, datavs, peerid, permutedid, - reachable_peers, current_share_peers): + reachable_peers, current_share_peers, started): lp = self.log(format="_got_query_results from %(peerid)s", peerid=idlib.shortnodeid_b2a(peerid)) + elapsed = time.time() - started + self._status.add_per_server_time(peerid, "read", elapsed) + assert isinstance(datavs, dict) reachable_peers[peerid] = permutedid if not datavs: @@ -1164,6 +1200,7 @@ class Publish: total_shares, reachable_peers, current_share_peers): self.log("_got_all_query_results") + # now that we know everything about the shares currently out there, # decide where to place the new shares. @@ -1230,16 +1267,25 @@ class Publish: target_info = (target_map, shares_per_peer) return target_info + def _query_peers_done(self, target_info): + self._obtain_privkey_started = now = time.time() + elapsed = time.time() - self._query_peers_started + self._status.timings["query"] = elapsed + return target_info + def _obtain_privkey(self, target_info): # make sure we've got a copy of our private key. if self._privkey: # Must have picked it up during _query_peers. We're good to go. + if "privkey_fetch" not in self._status.timings: + self._status.timings["privkey_fetch"] = 0.0 return target_info # Nope, we haven't managed to grab a copy, and we still need it. Ask # peers one at a time until we get a copy. Only bother asking peers # who've admitted to holding a share. + self._privkey_fetch_started = time.time() target_map, shares_per_peer = target_info # pull shares from self._encprivkey_shares if not self._encprivkey_shares: @@ -1255,25 +1301,44 @@ class Publish: return d def _do_privkey_query(self, rref, peerid, shnum, offset, length): + started = time.time() d = rref.callRemote("slot_readv", self._storage_index, [shnum], [(offset, length)] ) - d.addCallback(self._privkey_query_response, peerid, shnum) + d.addCallback(self._privkey_query_response, peerid, shnum, started) return d - def _privkey_query_response(self, datav, peerid, shnum): + def _privkey_query_response(self, datav, peerid, shnum, started): + elapsed = time.time() - started + self._status.add_per_server_time(peerid, "read", elapsed) + data = datav[shnum][0] self._try_to_validate_privkey(data, peerid, shnum) + elapsed = time.time() - self._privkey_fetch_started + self._status.timings["privkey_fetch"] = elapsed + self._status.privkey_from = peerid + + def _obtain_privkey_done(self, target_info): + elapsed = time.time() - self._obtain_privkey_started + self._status.timings["privkey"] = elapsed + return target_info + def _encrypt_and_encode(self, target_info, newdata, readkey, IV, required_shares, total_shares): self.log("_encrypt_and_encode") + started = time.time() + key = hashutil.ssk_readkey_data_hash(IV, readkey) enc = AES(key) crypttext = enc.process(newdata) assert len(crypttext) == len(newdata) + now = time.time() + self._status.timings["encrypt"] = now - started + started = now + # now apply FEC self.MAX_SEGMENT_SIZE = 1024*1024 data_length = len(crypttext) @@ -1298,6 +1363,11 @@ class Publish: assert len(piece) == piece_size d = fec.encode(crypttext_pieces) + def _done_encoding(res): + elapsed = time.time() - started + self._status.timings["encode"] = elapsed + return res + d.addCallback(_done_encoding) d.addCallback(lambda shares_and_shareids: (shares_and_shareids, required_shares, total_shares, @@ -1311,6 +1381,7 @@ class Publish: target_info), seqnum, IV): self.log("_generate_shares") + started = time.time() # we should know these by now privkey = self._privkey @@ -1356,7 +1427,9 @@ class Publish: # then they all share the same encprivkey at the end. The sizes # of everything are the same for all shares. + sign_started = time.time() signature = privkey.sign(prefix) + self._status.timings["sign"] = time.time() - sign_started verification_key = pubkey.serialize() @@ -1370,11 +1443,15 @@ class Publish: all_shares[shnum], encprivkey) final_shares[shnum] = final_share + elapsed = time.time() - started + self._status.timings["pack"] = elapsed return (seqnum, root_hash, final_shares, target_info) def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV): self.log("_send_shares") + started = time.time() + # we're finally ready to send out our shares. If we encounter any # surprises here, it's because somebody else is writing at the same # time. (Note: in the future, when we remove the _query_peers() step @@ -1417,10 +1494,17 @@ class Publish: d = self._do_testreadwrite(peerid, secrets, tw_vectors, read_vector) d.addCallback(self._got_write_answer, tw_vectors, my_checkstring, - peerid, expected_old_shares[peerid], dispatch_map) + peerid, expected_old_shares[peerid], dispatch_map, + started) dl.append(d) d = defer.DeferredList(dl) + def _done_sending(res): + elapsed = time.time() - started + self._status.timings["push"] = elapsed + self._status.sharemap = dispatch_map + return res + d.addCallback(_done_sending) d.addCallback(lambda res: (self._surprised, dispatch_map)) return d @@ -1438,9 +1522,12 @@ class Publish: def _got_write_answer(self, answer, tw_vectors, my_checkstring, peerid, expected_old_shares, - dispatch_map): + dispatch_map, started): lp = self.log("_got_write_answer from %s" % idlib.shortnodeid_b2a(peerid)) + elapsed = time.time() - started + self._status.add_per_server_time(peerid, "write", elapsed) + wrote, read_data = answer surprised = False diff --git a/src/allmydata/web/publish-status.xhtml b/src/allmydata/web/publish-status.xhtml index f6b862c3..9250371b 100644 --- a/src/allmydata/web/publish-status.xhtml +++ b/src/allmydata/web/publish-status.xhtml @@ -19,4 +19,41 @@
  • Status:
  • +

    Retrieve Results

    + + +
    Return to the Welcome Page
    + diff --git a/src/allmydata/web/retrieve-status.xhtml b/src/allmydata/web/retrieve-status.xhtml index a6371ecd..fe0b9e33 100644 --- a/src/allmydata/web/retrieve-status.xhtml +++ b/src/allmydata/web/retrieve-status.xhtml @@ -31,7 +31,10 @@