From dd4a95177095c3d299f9ada7ef89cd41b9d16450 Mon Sep 17 00:00:00 2001 From: Brian Warner <warner@allmydata.com> Date: Wed, 5 Mar 2008 18:41:10 -0700 Subject: [PATCH] webish: add publish status --- src/allmydata/mutable.py | 103 +++++++++++++++++++-- src/allmydata/web/publish-status.xhtml | 37 ++++++++ src/allmydata/web/retrieve-status.xhtml | 5 +- src/allmydata/web/status.py | 114 +++++++++++++++++++++++- 4 files changed, 249 insertions(+), 10 deletions(-) diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index 0cf7eed4..8d861ea6 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -248,6 +248,7 @@ class RetrieveStatus: def __init__(self): self.timings = {} self.timings["fetch_per_server"] = {} + self.timings["cumulative_verify"] = 0.0 self.sharemap = {} self.problems = {} self.active = True @@ -534,11 +535,18 @@ class Retrieve: if verinfo not in self._valid_versions: # it's a new pair. Verify the signature. + started = time.time() valid = self._pubkey.verify(prefix, signature) + # this records the total verification time for all versions we've + # seen. This time is included in "fetch". + elapsed = time.time() - started + self._status.timings["cumulative_verify"] += elapsed + if not valid: self._status.problems[peerid] = "sh#%d: invalid signature" % shnum raise CorruptShareError(peerid, shnum, "signature is invalid") + # ok, it's a valid verinfo. Add it to the list of validated # versions. self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d" @@ -878,20 +886,34 @@ class PublishStatus: statusid_counter = count(0) def __init__(self): self.timings = {} - self.sharemap = None + self.timings["per_server"] = {} + self.privkey_from = None + self.peers_queried = None + self.sharemap = None # DictOfSets + self.problems = {} self.active = True self.storage_index = None self.helper = False + self.encoding = ("?", "?") + self.initial_read_size = None self.size = None self.status = "Not started" self.progress = 0.0 self.counter = self.statusid_counter.next() self.started = time.time() + def add_per_server_time(self, peerid, op, elapsed): + assert op in ("read", "write") + if peerid not in self.timings["per_server"]: + self.timings["per_server"][peerid] = [] + self.timings["per_server"][peerid].append((op,elapsed)) + def get_started(self): return self.started def get_storage_index(self): return self.storage_index + def get_encoding(self): + return self.encoding def using_helper(self): return self.helper def get_size(self): @@ -909,6 +931,8 @@ class PublishStatus: self.storage_index = si def set_helper(self, helper): self.helper = helper + def set_encoding(self, k, n): + self.encoding = (k, n) def set_size(self, size): self.size = size def set_status(self, status): @@ -932,6 +956,7 @@ class Publish: self._status.set_helper(False) self._status.set_progress(0.0) self._status.set_active(True) + self._started = time.time() def log(self, *args, **kwargs): if 'parent' not in kwargs: @@ -962,7 +987,6 @@ class Publish: # 5: when enough responses are back, we're done self.log("starting publish, datalen is %s" % len(newdata)) - self._started = time.time() self._status.set_size(len(newdata)) self._writekey = self._node.get_writekey() @@ -978,6 +1002,7 @@ class Publish: required_shares = self._node.get_required_shares() total_shares = self._node.get_total_shares() self._pubkey = self._node.get_pubkey() + self._status.set_encoding(required_shares, total_shares) # these two may not be, we might have to get them from the first peer self._privkey = self._node.get_privkey() @@ -995,10 +1020,13 @@ class Publish: # with up to 7 entries, allowing us to make an update in 2 RTT # instead of 3. self._read_size = 1000 + self._status.initial_read_size = self._read_size d = defer.succeed(total_shares) d.addCallback(self._query_peers) + d.addCallback(self._query_peers_done) d.addCallback(self._obtain_privkey) + d.addCallback(self._obtain_privkey_done) d.addCallback(self._encrypt_and_encode, newdata, readkey, IV, required_shares, total_shares) @@ -1011,6 +1039,9 @@ class Publish: def _query_peers(self, total_shares): self.log("_query_peers") + self._query_peers_started = now = time.time() + elapsed = now - self._started + self._status.timings["setup"] = elapsed storage_index = self._storage_index @@ -1040,16 +1071,18 @@ class Publish: EPSILON = total_shares / 2 #partial_peerlist = islice(peerlist, total_shares + EPSILON) partial_peerlist = peerlist[:total_shares+EPSILON] + self._status.peers_queried = len(partial_peerlist) self._storage_servers = {} + started = time.time() dl = [] for permutedid, (peerid, ss) in enumerate(partial_peerlist): self._storage_servers[peerid] = ss d = self._do_query(ss, peerid, storage_index) d.addCallback(self._got_query_results, peerid, permutedid, - reachable_peers, current_share_peers) + reachable_peers, current_share_peers, started) dl.append(d) d = defer.DeferredList(dl) d.addCallback(self._got_all_query_results, @@ -1072,10 +1105,13 @@ class Publish: return d def _got_query_results(self, datavs, peerid, permutedid, - reachable_peers, current_share_peers): + reachable_peers, current_share_peers, started): lp = self.log(format="_got_query_results from %(peerid)s", peerid=idlib.shortnodeid_b2a(peerid)) + elapsed = time.time() - started + self._status.add_per_server_time(peerid, "read", elapsed) + assert isinstance(datavs, dict) reachable_peers[peerid] = permutedid if not datavs: @@ -1164,6 +1200,7 @@ class Publish: total_shares, reachable_peers, current_share_peers): self.log("_got_all_query_results") + # now that we know everything about the shares currently out there, # decide where to place the new shares. @@ -1230,16 +1267,25 @@ class Publish: target_info = (target_map, shares_per_peer) return target_info + def _query_peers_done(self, target_info): + self._obtain_privkey_started = now = time.time() + elapsed = time.time() - self._query_peers_started + self._status.timings["query"] = elapsed + return target_info + def _obtain_privkey(self, target_info): # make sure we've got a copy of our private key. if self._privkey: # Must have picked it up during _query_peers. We're good to go. + if "privkey_fetch" not in self._status.timings: + self._status.timings["privkey_fetch"] = 0.0 return target_info # Nope, we haven't managed to grab a copy, and we still need it. Ask # peers one at a time until we get a copy. Only bother asking peers # who've admitted to holding a share. + self._privkey_fetch_started = time.time() target_map, shares_per_peer = target_info # pull shares from self._encprivkey_shares if not self._encprivkey_shares: @@ -1255,25 +1301,44 @@ class Publish: return d def _do_privkey_query(self, rref, peerid, shnum, offset, length): + started = time.time() d = rref.callRemote("slot_readv", self._storage_index, [shnum], [(offset, length)] ) - d.addCallback(self._privkey_query_response, peerid, shnum) + d.addCallback(self._privkey_query_response, peerid, shnum, started) return d - def _privkey_query_response(self, datav, peerid, shnum): + def _privkey_query_response(self, datav, peerid, shnum, started): + elapsed = time.time() - started + self._status.add_per_server_time(peerid, "read", elapsed) + data = datav[shnum][0] self._try_to_validate_privkey(data, peerid, shnum) + elapsed = time.time() - self._privkey_fetch_started + self._status.timings["privkey_fetch"] = elapsed + self._status.privkey_from = peerid + + def _obtain_privkey_done(self, target_info): + elapsed = time.time() - self._obtain_privkey_started + self._status.timings["privkey"] = elapsed + return target_info + def _encrypt_and_encode(self, target_info, newdata, readkey, IV, required_shares, total_shares): self.log("_encrypt_and_encode") + started = time.time() + key = hashutil.ssk_readkey_data_hash(IV, readkey) enc = AES(key) crypttext = enc.process(newdata) assert len(crypttext) == len(newdata) + now = time.time() + self._status.timings["encrypt"] = now - started + started = now + # now apply FEC self.MAX_SEGMENT_SIZE = 1024*1024 data_length = len(crypttext) @@ -1298,6 +1363,11 @@ class Publish: assert len(piece) == piece_size d = fec.encode(crypttext_pieces) + def _done_encoding(res): + elapsed = time.time() - started + self._status.timings["encode"] = elapsed + return res + d.addCallback(_done_encoding) d.addCallback(lambda shares_and_shareids: (shares_and_shareids, required_shares, total_shares, @@ -1311,6 +1381,7 @@ class Publish: target_info), seqnum, IV): self.log("_generate_shares") + started = time.time() # we should know these by now privkey = self._privkey @@ -1356,7 +1427,9 @@ class Publish: # then they all share the same encprivkey at the end. The sizes # of everything are the same for all shares. + sign_started = time.time() signature = privkey.sign(prefix) + self._status.timings["sign"] = time.time() - sign_started verification_key = pubkey.serialize() @@ -1370,11 +1443,15 @@ class Publish: all_shares[shnum], encprivkey) final_shares[shnum] = final_share + elapsed = time.time() - started + self._status.timings["pack"] = elapsed return (seqnum, root_hash, final_shares, target_info) def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV): self.log("_send_shares") + started = time.time() + # we're finally ready to send out our shares. If we encounter any # surprises here, it's because somebody else is writing at the same # time. (Note: in the future, when we remove the _query_peers() step @@ -1417,10 +1494,17 @@ class Publish: d = self._do_testreadwrite(peerid, secrets, tw_vectors, read_vector) d.addCallback(self._got_write_answer, tw_vectors, my_checkstring, - peerid, expected_old_shares[peerid], dispatch_map) + peerid, expected_old_shares[peerid], dispatch_map, + started) dl.append(d) d = defer.DeferredList(dl) + def _done_sending(res): + elapsed = time.time() - started + self._status.timings["push"] = elapsed + self._status.sharemap = dispatch_map + return res + d.addCallback(_done_sending) d.addCallback(lambda res: (self._surprised, dispatch_map)) return d @@ -1438,9 +1522,12 @@ class Publish: def _got_write_answer(self, answer, tw_vectors, my_checkstring, peerid, expected_old_shares, - dispatch_map): + dispatch_map, started): lp = self.log("_got_write_answer from %s" % idlib.shortnodeid_b2a(peerid)) + elapsed = time.time() - started + self._status.add_per_server_time(peerid, "write", elapsed) + wrote, read_data = answer surprised = False diff --git a/src/allmydata/web/publish-status.xhtml b/src/allmydata/web/publish-status.xhtml index f6b862c3..9250371b 100644 --- a/src/allmydata/web/publish-status.xhtml +++ b/src/allmydata/web/publish-status.xhtml @@ -19,4 +19,41 @@ <li>Status: <span n:render="status"/></li> </ul> +<h2>Retrieve Results</h2> +<ul> + <li n:render="encoding" /> + <li n:render="peers_queried" /> + <li n:render="problems" /> + <li n:render="sharemap" /> + <li>Timings:</li> + <ul> + <li>Total: <span n:render="time" n:data="time_total" /> + (<span n:render="rate" n:data="rate_total" />)</li> + <ul> + <li>Setup: <span n:render="time" n:data="time_setup" /></li> + <li>Initial Version Query: <span n:render="time" n:data="time_query" /> + (read size <span n:render="string" n:data="initial_read_size"/> bytes)</li> + <li>Obtain Privkey: <span n:render="time" n:data="time_privkey" /> + <ul> + <li>Separate Privkey Fetch: <span n:render="time" n:data="time_privkey_fetch" /> <span n:render="privkey_from"/></li> + </ul></li> + <li>Encrypting: <span n:render="time" n:data="time_encrypt" /> + (<span n:render="rate" n:data="rate_encrypt" />)</li> + <li>Encoding: <span n:render="time" n:data="time_encode" /> + (<span n:render="rate" n:data="rate_encode" />)</li> + <li>Packing Shares: <span n:render="time" n:data="time_pack" /> + (<span n:render="rate" n:data="rate_pack" />) + <ul> + <li>RSA Signature: <span n:render="time" n:data="time_sign" /></li> + </ul></li> + + <li>Pushing: <span n:render="time" n:data="time_push" /> + (<span n:render="rate" n:data="rate_push" />)</li> + </ul> + <li n:render="server_timings" /> + </ul> +</ul> + +<div>Return to the <a href="/">Welcome Page</a></div> + </body></html> diff --git a/src/allmydata/web/retrieve-status.xhtml b/src/allmydata/web/retrieve-status.xhtml index a6371ecd..fe0b9e33 100644 --- a/src/allmydata/web/retrieve-status.xhtml +++ b/src/allmydata/web/retrieve-status.xhtml @@ -31,7 +31,10 @@ <ul> <li>Initial Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li> <li>Fetching: <span n:render="time" n:data="time_fetch" /> - (<span n:render="rate" n:data="rate_fetch" />)</li> + (<span n:render="rate" n:data="rate_fetch" />) + <ul> + <li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li> + </ul></li> <li>Decoding: <span n:render="time" n:data="time_decode" /> (<span n:render="rate" n:data="rate_decode" />)</li> <li>Decrypting: <span n:render="time" n:data="time_decrypt" /> diff --git a/src/allmydata/web/status.py b/src/allmydata/web/status.py index 373ab689..28f5df37 100644 --- a/src/allmydata/web/status.py +++ b/src/allmydata/web/status.py @@ -492,6 +492,9 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin): def data_rate_fetch(self, ctx, data): return self._get_rate(data, "fetch") + def data_time_cumulative_verify(self, ctx, data): + return self.retrieve_status.timings.get("cumulative_verify") + def data_time_decode(self, ctx, data): return self.retrieve_status.timings.get("decode") def data_rate_decode(self, ctx, data): @@ -515,9 +518,13 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin): return T.li["Per-Server Fetch Response Times: ", l] -class PublishStatusPage(rend.Page): +class PublishStatusPage(rend.Page, RateAndTimeMixin): docFactory = getxmlfile("publish-status.xhtml") + def __init__(self, data): + rend.Page.__init__(self, data) + self.publish_status = data + def render_started(self, ctx, data): TIME_FORMAT = "%H:%M:%S %d-%b-%Y" started_s = time.strftime(TIME_FORMAT, @@ -548,6 +555,111 @@ class PublishStatusPage(rend.Page): def render_status(self, ctx, data): return data.get_status() + def render_encoding(self, ctx, data): + k, n = data.get_encoding() + return ctx.tag["Encoding: %s of %s" % (k, n)] + + def render_peers_queried(self, ctx, data): + return ctx.tag["Peers Queried: ", data.peers_queried] + + def render_sharemap(self, ctx, data): + sharemap = data.sharemap + if sharemap is None: + return ctx.tag["None"] + l = T.ul() + for shnum in sorted(sharemap.keys()): + l[T.li["%d -> Placed on " % shnum, + ", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid) + for (peerid,seqnum,root_hash) + in sharemap[shnum]])]] + return ctx.tag["Sharemap:", l] + + def render_problems(self, ctx, data): + problems = data.problems + if not problems: + return "" + l = T.ul() + for peerid in sorted(problems.keys()): + peerid_s = idlib.shortnodeid_b2a(peerid) + l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]] + return ctx.tag["Server Problems:", l] + + def _get_rate(self, data, name): + file_size = self.publish_status.get_size() + time = self.publish_status.timings.get(name) + if time is None: + return None + try: + return 1.0 * file_size / time + except ZeroDivisionError: + return None + + def data_time_total(self, ctx, data): + return self.publish_status.timings.get("total") + def data_rate_total(self, ctx, data): + return self._get_rate(data, "total") + + def data_time_setup(self, ctx, data): + return self.publish_status.timings.get("setup") + + def data_time_query(self, ctx, data): + return self.publish_status.timings.get("query") + + def data_time_privkey(self, ctx, data): + return self.publish_status.timings.get("privkey") + + def data_time_privkey_fetch(self, ctx, data): + return self.publish_status.timings.get("privkey_fetch") + def render_privkey_from(self, ctx, data): + peerid = data.privkey_from + if peerid: + return " (got from [%s])" % idlib.shortnodeid_b2a(peerid) + else: + return "" + + def data_time_encrypt(self, ctx, data): + return self.publish_status.timings.get("encrypt") + def data_rate_encrypt(self, ctx, data): + return self._get_rate(data, "encrypt") + + def data_time_encode(self, ctx, data): + return self.publish_status.timings.get("encode") + def data_rate_encode(self, ctx, data): + return self._get_rate(data, "encode") + + def data_time_pack(self, ctx, data): + return self.publish_status.timings.get("pack") + def data_rate_pack(self, ctx, data): + return self._get_rate(data, "pack") + def data_time_sign(self, ctx, data): + return self.publish_status.timings.get("sign") + + def data_time_push(self, ctx, data): + return self.publish_status.timings.get("push") + def data_rate_push(self, ctx, data): + return self._get_rate(data, "push") + + def data_initial_read_size(self, ctx, data): + return self.publish_status.initial_read_size + + def render_server_timings(self, ctx, data): + per_server = self.publish_status.timings.get("per_server") + if not per_server: + return "" + l = T.ul() + for peerid in sorted(per_server.keys()): + peerid_s = idlib.shortnodeid_b2a(peerid) + times = [] + for op,t in per_server[peerid]: + if op == "read": + times.append( "(" + self.render_time(None, t) + ")" ) + else: + times.append( self.render_time(None, t) ) + times_s = ", ".join(times) + l[T.li["[%s]: %s" % (peerid_s, times_s)]] + return T.li["Per-Server Response Times: ", l] + + class Status(rend.Page): docFactory = getxmlfile("status.xhtml") addSlash = True -- 2.45.2