From ec23da28a6a8daf6c6bd61a86bc0bfc8d42063e7 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 4 Mar 2008 21:04:36 -0700 Subject: [PATCH] webish: add more mutable-retrieve timing status --- src/allmydata/mutable.py | 59 ++++++++- src/allmydata/web/retrieve-status.xhtml | 24 ++++ src/allmydata/webish.py | 162 ++++++++++++++++-------- 3 files changed, 185 insertions(+), 60 deletions(-) diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index f983d3b7..49569852 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -203,10 +203,14 @@ class RetrieveStatus: statusid_counter = count(0) def __init__(self): self.timings = {} - self.sharemap = None + self.timings["fetch_per_server"] = {} + self.sharemap = {} + self.problems = {} self.active = True self.storage_index = None self.helper = False + self.encoding = ("?","?") + self.search_distance = None self.size = None self.status = "Not started" self.progress = 0.0 @@ -217,6 +221,10 @@ class RetrieveStatus: return self.started def get_storage_index(self): return self.storage_index + def get_encoding(self): + return self.encoding + def get_search_distance(self): + return self.search_distance def using_helper(self): return self.helper def get_size(self): @@ -234,6 +242,10 @@ class RetrieveStatus: self.storage_index = si def set_helper(self, helper): self.helper = helper + def set_encoding(self, k, n): + self.encoding = (k, n) + def set_search_distance(self, value): + self.search_distance = value def set_size(self, size): self.size = size def set_status(self, status): @@ -346,6 +358,7 @@ class Retrieve: # the hashes over and over again. self._valid_shares = {} + self._started = time.time() self._done_deferred = defer.Deferred() d = defer.succeed(initial_query_count) @@ -359,6 +372,7 @@ class Retrieve: def _choose_initial_peers(self, numqueries): n = self._node + started = time.time() full_peerlist = n._client.get_permuted_peers("storage", self._storage_index) @@ -373,9 +387,13 @@ class Retrieve: # we later increase this limit, it may be useful to re-scan the # permuted list. self._peerlist_limit = numqueries + self._status.set_search_distance(len(self._peerlist)) + elapsed = time.time() - started + self._status.timings["peer_selection"] = elapsed return self._peerlist def _send_initial_requests(self, peerlist): + self._first_query_sent = time.time() self._bad_peerids = set() self._running = True self._queries_outstanding = set() @@ -392,9 +410,11 @@ class Retrieve: return None def _do_query(self, ss, peerid, storage_index, readsize): + started = time.time() self._queries_outstanding.add(peerid) d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)]) - d.addCallback(self._got_results, peerid, readsize, (ss, storage_index)) + d.addCallback(self._got_results, peerid, readsize, (ss, storage_index), + started) d.addErrback(self._query_failed, peerid) # errors that aren't handled by _query_failed (and errors caused by # _query_failed) get logged, but we still want to check for doneness. @@ -406,12 +426,19 @@ class Retrieve: verifier = rsa.create_verifying_key_from_string(pubkey_s) return verifier - def _got_results(self, datavs, peerid, readsize, stuff): + def _got_results(self, datavs, peerid, readsize, stuff, started): + elapsed = time.time() - started + if peerid not in self._status.timings["fetch_per_server"]: + self._status.timings["fetch_per_server"][peerid] = [] + self._status.timings["fetch_per_server"][peerid].append(elapsed) self._queries_outstanding.discard(peerid) self._used_peers.add(peerid) if not self._running: return + if peerid not in self._status.sharemap: + self._status.sharemap[peerid] = set() + for shnum,datav in datavs.items(): data = datav[0] try: @@ -447,16 +474,20 @@ class Retrieve: fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) assert len(fingerprint) == 32 if fingerprint != self._node._fingerprint: + self._status.problems[peerid] = "sh#%d: pubkey doesn't match fingerprint" % shnum raise CorruptShareError(peerid, shnum, "pubkey doesn't match fingerprint") self._pubkey = self._deserialize_pubkey(pubkey_s) self._node._populate_pubkey(self._pubkey) verinfo = (seqnum, root_hash, IV, segsize, datalength) + self._status.sharemap[peerid].add(verinfo) + if verinfo not in self._valid_versions: # it's a new pair. Verify the signature. valid = self._pubkey.verify(prefix, signature) if not valid: + self._status.problems[peerid] = "sh#%d: invalid signature" % shnum raise CorruptShareError(peerid, shnum, "signature is invalid") # ok, it's a valid verinfo. Add it to the list of validated @@ -486,11 +517,15 @@ class Retrieve: # rest of the shares), we need to implement the refactoring mentioned # above. if k != self._required_shares: + self._status.problems[peerid] = "sh#%d: k=%d, we want %d" \ + % (shnum, k, self._required_shares) raise CorruptShareError(peerid, shnum, "share has k=%d, we want k=%d" % (k, self._required_shares)) if N != self._total_shares: + self._status.problems[peerid] = "sh#%d: N=%d, we want %d" \ + % (shnum, N, self._total_shares) raise CorruptShareError(peerid, shnum, "share has N=%d, we want N=%d" % (N, self._total_shares)) @@ -587,14 +622,19 @@ class Retrieve: level=log.UNUSUAL) # are there any peers on the list that we haven't used? new_query_peers = [] - for (peerid, ss) in self._peerlist: + peer_indicies = [] + for i, (peerid, ss) in enumerate(self._peerlist): if peerid not in self._used_peers: new_query_peers.append( (peerid, ss) ) + peer_indicies.append(i) if len(new_query_peers) > 5: # only query in batches of 5. TODO: this is pretty # arbitrary, really I want this to be something like # k - max(known_version_sharecounts) + some extra break + new_search_distance = max(max(peer_indicies), + self._status.get_search_distance()) + self._status.set_search_distance(new_search_distance) if new_query_peers: self.log("sending %d new queries (read %d bytes)" % (len(new_query_peers), self._read_size), level=log.UNUSUAL) @@ -671,6 +711,8 @@ class Retrieve: # now that the big loop is done, all shares in the sharemap are # valid, and they're all for the same seqnum+root_hash version, so # it's now down to doing FEC and decrypt. + elapsed = time.time() - self._started + self._status.timings["fetch"] = elapsed assert len(shares) >= self._required_shares, len(shares) d = defer.maybeDeferred(self._decode, shares, segsize, datalength) d.addCallback(self._decrypt, IV, seqnum, root_hash) @@ -728,8 +770,12 @@ class Retrieve: self.log("params %s, we have %d shares" % (params, len(shares))) self.log("about to decode, shareids=%s" % (shareids,)) + started = time.time() d = defer.maybeDeferred(fec.decode, shares, shareids) def _done(buffers): + elapsed = time.time() - started + self._status.timings["decode"] = elapsed + self._status.set_encoding(self._required_shares, self._total_shares) self.log(" decode done, %d buffers" % len(buffers)) segment = "".join(buffers) self.log(" joined length %d, datalength %d" % @@ -745,9 +791,12 @@ class Retrieve: return d def _decrypt(self, crypttext, IV, seqnum, root_hash): + started = time.time() key = hashutil.ssk_readkey_data_hash(IV, self._readkey) decryptor = AES(key) plaintext = decryptor.process(crypttext) + elapsed = time.time() - started + self._status.timings["decrypt"] = elapsed # it worked, so record the seqnum and root_hash for next time self._node._populate_seqnum(seqnum) self._node._populate_root_hash(root_hash) @@ -760,6 +809,8 @@ class Retrieve: self._status.set_status("Done") self._status.set_progress(1.0) self._status.set_size(len(contents)) + elapsed = time.time() - self._started + self._status.timings["total"] = elapsed eventually(self._done_deferred.callback, contents) def get_status(self): diff --git a/src/allmydata/web/retrieve-status.xhtml b/src/allmydata/web/retrieve-status.xhtml index 422cdcd8..a6371ecd 100644 --- a/src/allmydata/web/retrieve-status.xhtml +++ b/src/allmydata/web/retrieve-status.xhtml @@ -19,4 +19,28 @@
  • Status:
  • +

    Retrieve Results

    + + +
    Return to the Welcome Page
    + diff --git a/src/allmydata/webish.py b/src/allmydata/webish.py index bae3d7d1..1cab77a3 100644 --- a/src/allmydata/webish.py +++ b/src/allmydata/webish.py @@ -1373,12 +1373,42 @@ class UnlinkedPUTCreateDirectory(rend.Page): # XXX add redirect_to_result return d -def plural(sequence): - if len(sequence) == 1: +def plural(sequence_or_length): + if isinstance(sequence_or_length, int): + length = sequence_or_length + else: + length = len(sequence_or_length) + if length == 1: return "" return "s" -class UploadResultsRendererMixin: +class RateAndTimeMixin: + + def render_time(self, ctx, data): + # 1.23s, 790ms, 132us + if data is None: + return "" + s = float(data) + if s >= 1.0: + return "%.2fs" % s + if s >= 0.01: + return "%dms" % (1000*s) + if s >= 0.001: + return "%.1fms" % (1000*s) + return "%dus" % (1000000*s) + + def render_rate(self, ctx, data): + # 21.8kBps, 554.4kBps 4.37MBps + if data is None: + return "" + r = float(data) + if r > 1000000: + return "%1.2fMBps" % (r/1000000) + if r > 1000: + return "%.1fkBps" % (r/1000) + return "%dBps" % r + +class UploadResultsRendererMixin(RateAndTimeMixin): # this requires a method named 'upload_results' def render_sharemap(self, ctx, data): @@ -1417,30 +1447,6 @@ class UploadResultsRendererMixin: d.addCallback(lambda res: res.file_size) return d - def render_time(self, ctx, data): - # 1.23s, 790ms, 132us - if data is None: - return "" - s = float(data) - if s >= 1.0: - return "%.2fs" % s - if s >= 0.01: - return "%dms" % (1000*s) - if s >= 0.001: - return "%.1fms" % (1000*s) - return "%dus" % (1000000*s) - - def render_rate(self, ctx, data): - # 21.8kBps, 554.4kBps 4.37MBps - if data is None: - return "" - r = float(data) - if r > 1000000: - return "%1.2fMBps" % (r/1000000) - if r > 1000: - return "%.1fkBps" % (r/1000) - return "%dBps" % r - def _get_time(self, name): d = self.upload_results() d.addCallback(lambda res: res.timings.get(name)) @@ -1678,7 +1684,7 @@ class UploadStatusPage(UploadResultsRendererMixin, rend.Page): def render_status(self, ctx, data): return data.get_status() -class DownloadResultsRendererMixin: +class DownloadResultsRendererMixin(RateAndTimeMixin): # this requires a method named 'download_results' def render_servermap(self, ctx, data): @@ -1730,30 +1736,6 @@ class DownloadResultsRendererMixin: d.addCallback(lambda res: res.file_size) return d - def render_time(self, ctx, data): - # 1.23s, 790ms, 132us - if data is None: - return "" - s = float(data) - if s >= 1.0: - return "%.2fs" % s - if s >= 0.01: - return "%dms" % (1000*s) - if s >= 0.001: - return "%.1fms" % (1000*s) - return "%dus" % (1000000*s) - - def render_rate(self, ctx, data): - # 21.8kBps, 554.4kBps 4.37MBps - if data is None: - return "" - r = float(data) - if r > 1000000: - return "%1.2fMBps" % (r/1000000) - if r > 1000: - return "%.1fkBps" % (r/1000) - return "%dBps" % r - def _get_time(self, name): d = self.download_results() d.addCallback(lambda res: res.timings.get(name)) @@ -1877,9 +1859,13 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page): def render_status(self, ctx, data): return data.get_status() -class RetrieveStatusPage(rend.Page): +class RetrieveStatusPage(rend.Page, RateAndTimeMixin): docFactory = getxmlfile("retrieve-status.xhtml") + def __init__(self, data): + rend.Page.__init__(self, data) + self.retrieve_status = data + def render_started(self, ctx, data): TIME_FORMAT = "%H:%M:%S %d-%b-%Y" started_s = time.strftime(TIME_FORMAT, @@ -1910,6 +1896,70 @@ class RetrieveStatusPage(rend.Page): def render_status(self, ctx, data): return data.get_status() + def render_encoding(self, ctx, data): + k, n = data.get_encoding() + return ctx.tag["Encoding: %s of %s" % (k, n)] + + def render_search_distance(self, ctx, data): + d = data.get_search_distance() + return ctx.tag["Search Distance: %s peer%s" % (d, plural(d))] + + def render_problems(self, ctx, data): + problems = data.problems + if not problems: + return "" + l = T.ul() + for peerid in sorted(problems.keys()): + peerid_s = idlib.shortnodeid_b2a(peerid) + l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]] + return ctx.tag["Server Problems:", l] + + def _get_rate(self, data, name): + file_size = self.retrieve_status.get_size() + time = self.retrieve_status.timings.get(name) + if time is None: + return None + try: + return 1.0 * file_size / time + except ZeroDivisionError: + return None + + def data_time_total(self, ctx, data): + return self.retrieve_status.timings.get("total") + def data_rate_total(self, ctx, data): + return self._get_rate(data, "total") + + def data_time_peer_selection(self, ctx, data): + return self.retrieve_status.timings.get("peer_selection") + + def data_time_fetch(self, ctx, data): + return self.retrieve_status.timings.get("fetch") + def data_rate_fetch(self, ctx, data): + return self._get_rate(data, "fetch") + + def data_time_decode(self, ctx, data): + return self.retrieve_status.timings.get("decode") + def data_rate_decode(self, ctx, data): + return self._get_rate(data, "decode") + + def data_time_decrypt(self, ctx, data): + return self.retrieve_status.timings.get("decrypt") + def data_rate_decrypt(self, ctx, data): + return self._get_rate(data, "decrypt") + + def render_server_timings(self, ctx, data): + per_server = self.retrieve_status.timings.get("fetch_per_server") + if not per_server: + return "" + l = T.ul() + for peerid in sorted(per_server.keys()): + peerid_s = idlib.shortnodeid_b2a(peerid) + times_s = ", ".join([self.render_time(None, t) + for t in per_server[peerid]]) + l[T.li["[%s]: %s" % (peerid_s, times_s)]] + return T.li["Per-Server Fetch Response Times: ", l] + + class PublishStatusPage(rend.Page): docFactory = getxmlfile("publish-status.xhtml") @@ -1956,9 +2006,9 @@ class Status(rend.Page): def data_recent_operations(self, ctx, data): recent = [o for o in (IClient(ctx).list_recent_uploads() + - IClient(ctx).list_recent_downloads() + - IClient(ctx).list_recent_publish() + - IClient(ctx).list_recent_retrieve()) + IClient(ctx).list_recent_downloads() + + IClient(ctx).list_recent_publish() + + IClient(ctx).list_recent_retrieve()) if not o.get_active()] recent.sort(lambda a,b: cmp(a.get_started(), b.get_started())) recent.reverse() -- 2.37.2