From 2b49605c51e4ae455c181861aab75b79e9a31c87 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 3 Mar 2008 19:19:21 -0700 Subject: [PATCH] webish: add 'download results', with some basic timing information --- src/allmydata/checker.py | 1 + src/allmydata/download.py | 58 ++++++++++-- src/allmydata/interfaces.py | 45 ++++++++- src/allmydata/offloaded.py | 2 +- src/allmydata/storage.py | 7 +- src/allmydata/web/download-status.xhtml | 28 ++++++ src/allmydata/webish.py | 120 +++++++++++++++++++++++- 7 files changed, 247 insertions(+), 14 deletions(-) diff --git a/src/allmydata/checker.py b/src/allmydata/checker.py index b0855817..3ac3e11f 100644 --- a/src/allmydata/checker.py +++ b/src/allmydata/checker.py @@ -122,6 +122,7 @@ class SimpleCHKFileVerifier(download.FileDownloader): self._paused = False self._stopped = False + self._results = None self.active_buckets = {} # k: shnum, v: bucket self._share_buckets = [] # list of (sharenum, bucket) tuples self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 907f9387..87b039f2 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -1,16 +1,16 @@ -import os, random, weakref, itertools +import os, random, weakref, itertools, time from zope.interface import implements from twisted.internet import defer from twisted.internet.interfaces import IPushProducer, IConsumer from twisted.application import service from foolscap.eventual import eventually -from allmydata.util import base32, mathutil, hashutil, log, idlib +from allmydata.util import base32, mathutil, hashutil, log from allmydata.util.assertutil import _assert from allmydata import codec, hashtree, storage, uri from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \ - IDownloadStatus + IDownloadStatus, IDownloadResults from allmydata.encode import NotEnoughPeersError from pycryptopp.cipher.aes import AES @@ -28,6 +28,16 @@ class BadCrypttextHashValue(Exception): class DownloadStopped(Exception): pass +class DownloadResults: + implements(IDownloadResults) + + def __init__(self): + self.servers_used = set() + self.server_problems = {} + self.servermap = {} + self.timings = {} + self.file_size = None + class Output: def __init__(self, downloadable, key, total_length, log_parent, download_status): @@ -338,6 +348,7 @@ class DownloadStatus: self.paused = False self.stopped = False self.active = True + self.results = None self.counter = self.statusid_counter.next() def get_storage_index(self): @@ -357,6 +368,8 @@ class DownloadStatus: return self.progress def get_active(self): return self.active + def get_results(self): + return self.results def get_counter(self): return self.counter @@ -376,6 +389,8 @@ class DownloadStatus: self.progress = value def set_active(self, value): self.active = value + def set_results(self, value): + self.results = value class FileDownloader: implements(IPushProducer) @@ -396,6 +411,7 @@ class FileDownloader: self._si_s = storage.si_b2a(self._storage_index) self.init_logging() + self._started = time.time() self._status = s = DownloadStatus() s.set_status("Starting") s.set_storage_index(self._storage_index) @@ -403,6 +419,10 @@ class FileDownloader: s.set_helper(False) s.set_active(True) + self._results = DownloadResults() + s.set_results(self._results) + self._results.file_size = self._size + if IConsumer.providedBy(downloadable): downloadable.registerProducer(self, True) self._downloadable = downloadable @@ -463,6 +483,8 @@ class FileDownloader: def start(self): self.log("starting download") + if self._results: + self._results.timings["servers_peer_selection"] = {} # first step: who should we download from? d = defer.maybeDeferred(self._get_all_shareholders) d.addCallback(self._got_all_shareholders) @@ -495,10 +517,9 @@ class FileDownloader: dl = [] for (peerid,ss) in self._client.get_permuted_peers("storage", self._storage_index): - peerid_s = idlib.shortnodeid_b2a(peerid) d = ss.callRemote("get_buckets", self._storage_index) d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(peerid_s,)) + callbackArgs=(peerid,)) dl.append(d) self._responses_received = 0 self._queries_sent = len(dl) @@ -508,14 +529,17 @@ class FileDownloader: self._queries_sent)) return defer.DeferredList(dl) - def _got_response(self, buckets, peerid_s): + def _got_response(self, buckets, peerid): self._responses_received += 1 + if self._results: + elapsed = time.time() - self._started + self._results.timings["servers_peer_selection"][peerid] = elapsed if self._status: self._status.set_status("Locating Shares (%d/%d)" % (self._responses_received, self._queries_sent)) for sharenum, bucket in buckets.iteritems(): - b = storage.ReadBucketProxy(bucket, peerid_s, self._si_s) + b = storage.ReadBucketProxy(bucket, peerid, self._si_s) self.add_share_bucket(sharenum, b) self._uri_extension_sources.append(b) @@ -539,6 +563,10 @@ class FileDownloader: del self._share_vbuckets[shnum] def _got_all_shareholders(self, res): + if self._results: + now = time.time() + self._results.timings["peer_selection"] = now - self._started + if len(self._share_buckets) < self._num_needed_shares: raise NotEnoughPeersError @@ -558,6 +586,7 @@ class FileDownloader: if self._status: self._status.set_status("Obtaining URI Extension") + self._uri_extension_fetch_started = time.time() def _validate(proposal, bucket): h = hashutil.uri_extension_hash(proposal) if h != self._uri_extension_hash: @@ -599,6 +628,10 @@ class FileDownloader: return d def _got_uri_extension(self, uri_extension_data): + if self._results: + elapsed = time.time() - self._uri_extension_fetch_started + self._results.timings["uri_extension"] = elapsed + d = self._uri_extension_data = uri_extension_data self._codec = codec.get_decoder_by_name(d['codec_name']) @@ -621,6 +654,7 @@ class FileDownloader: self._share_hashtree.set_hashes({0: self._roothash}) def _get_hashtrees(self, res): + self._get_hashtrees_started = time.time() if self._status: self._status.set_status("Retrieving Hash Trees") d = self._get_plaintext_hashtrees() @@ -679,7 +713,9 @@ class FileDownloader: def _setup_hashtrees(self, res): self._output.setup_hashtrees(self._plaintext_hashtree, self._crypttext_hashtree) - + if self._results: + elapsed = time.time() - self._get_hashtrees_started + self._results.timings["hashtrees"] = elapsed def _create_validated_buckets(self, ignored=None): self._share_vbuckets = {} @@ -719,6 +755,8 @@ class FileDownloader: # RIBucketReader references. self.active_buckets = {} # k: shnum, v: ValidatedBucket instance + self._started_fetching = time.time() + d = defer.succeed(None) for segnum in range(self._total_segments-1): d.addCallback(self._download_segment, segnum) @@ -801,6 +839,10 @@ class FileDownloader: def _done(self, res): self.log("download done") + if self._results: + now = time.time() + self._results.timings["total"] = now - self._started + self._results.timings["fetching"] = now - self._started_fetching self._output.close() if self.check_crypttext_hash: _assert(self._crypttext_hash == self._output.crypttext_hash, diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 2d7f71ed..83604896 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1278,9 +1278,52 @@ class IUploadable(Interface): class IUploadResults(Interface): """I am returned by upload() methods. I contain a number of public - attributes which can be read to determine the results of the upload:: + attributes which can be read to determine the results of the upload. Some + of these are functional, some are timing information. All of these may be + None.:: + .file_size : the size of the file, in bytes .uri : the CHK read-cap for the file + .ciphertext_fetched : how many bytes were fetched by the helper + .sharemap : dict mapping share number to placement string + .servermap : dict mapping server peerid to a set of share numbers + .timings : dict of timing information, mapping name to seconds (float) + total : total upload time, start to finish + storage_index : time to compute the storage index + peer_selection : time to decide which peers will be used + contacting_helper : initial helper query to upload/no-upload decision + existence_check : helper pre-upload existence check + helper_total : initial helper query to helper finished pushing + cumulative_fetch : helper waiting for ciphertext requests + total_fetch : helper start to last ciphertext response + cumulative_encoding : just time spent in zfec + cumulative_sending : just time spent waiting for storage servers + hashes_and_close : last segment push to shareholder close + total_encode_and_push : first encode to shareholder close + + """ + +class IDownloadResults(Interface): + """I am created internally by download() methods. I contain a number of + public attributes which contain details about the download process.:: + + .file_size : the size of the file, in bytes + .servers_used : set of server peerids that were used during download + .server_problems : dict mapping server peerid to a problem string. Only + servers that had problems (bad hashes, disconnects) are + listed here. + .servermap : dict mapping server peerid to a set of share numbers. Only + servers that had any shares are listed here. + .timings : dict of timing information, mapping name to seconds (float) + peer_selection : time to ask servers about shares + servers_peer_selection : dict of peerid to DYHB-query time + uri_extension : time to fetch a copy of the URI extension block + hashtrees : time to fetch the hash trees + fetching : time to fetch, decode, and deliver segments + cumulative_fetching : time spent waiting for storage servers + cumulative_decoding : just time spent in zfec + total : total download time, start to finish + servers_fetching : dict of peerid to list of per-segment fetch times """ diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index f95e33e7..78e489cd 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -84,7 +84,7 @@ class CHKCheckerAndUEBFetcher: self.log("no readers, so no UEB", level=log.NOISY) return b,peerid = self._readers.pop() - rbp = storage.ReadBucketProxy(b, idlib.shortnodeid_b2a(peerid), + rbp = storage.ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index)) d = rbp.startIfNecessary() d.addCallback(lambda res: rbp.get_uri_extension()) diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index a812906a..f3705181 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -1201,14 +1201,15 @@ class WriteBucketProxy: class ReadBucketProxy: implements(IStorageBucketReader) - def __init__(self, rref, peerid_s=None, storage_index_s=None): + def __init__(self, rref, peerid=None, storage_index_s=None): self._rref = rref - self._peerid_s = peerid_s + self._peerid = peerid self._si_s = storage_index_s self._started = False def __repr__(self): - return "" % (self._peerid_s, + peerid_s = idlib.shortnodeid_b2a(self._peerid) + return "" % (peerid_s, self._si_s) def startIfNecessary(self): diff --git a/src/allmydata/web/download-status.xhtml b/src/allmydata/web/download-status.xhtml index b89a68b7..b773fb5e 100644 --- a/src/allmydata/web/download-status.xhtml +++ b/src/allmydata/web/download-status.xhtml @@ -18,6 +18,34 @@
  • Status:
  • + +
    +

    Download Results

    +
      +
    • Servers Used:
    • +
    • Servermap:
    • +
    • Timings:
    • +
        +
      • File Size: bytes
      • +
      • Total: + ()
      • +
          +
        • Peer Selection:
        • +
        • UEB Fetch:
        • +
        • Hashtree Fetch:
        • +
        • Segment Fetch: + ()
        • +
            +
          • Cumulative Fetching: + ()
          • +
          • Cumulative Decoding: + ()
          • +
          +
        +
      +
    +
    +
    Return to the Welcome Page
    diff --git a/src/allmydata/webish.py b/src/allmydata/webish.py index 427e7ef7..59b9db6c 100644 --- a/src/allmydata/webish.py +++ b/src/allmydata/webish.py @@ -1663,9 +1663,127 @@ class UploadStatusPage(UploadResultsRendererMixin, rend.Page): def render_status(self, ctx, data): return data.get_status() -class DownloadStatusPage(rend.Page): +class DownloadResultsRendererMixin: + # this requires a method named 'download_results' + + def render_servers_used(self, ctx, data): + return "nope" + + def render_servermap(self, ctx, data): + d = self.download_results() + d.addCallback(lambda res: res.servermap) + def _render(servermap): + if servermap is None: + return "None" + l = T.ul() + for peerid in sorted(servermap.keys()): + peerid_s = idlib.shortnodeid_b2a(peerid) + shares_s = ",".join([str(shnum) for shnum in servermap[peerid]]) + l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]] + return l + d.addCallback(_render) + return d + + def data_file_size(self, ctx, data): + d = self.download_results() + d.addCallback(lambda res: res.file_size) + return d + + def render_time(self, ctx, data): + # 1.23s, 790ms, 132us + if data is None: + return "" + s = float(data) + if s >= 1.0: + return "%.2fs" % s + if s >= 0.01: + return "%dms" % (1000*s) + if s >= 0.001: + return "%.1fms" % (1000*s) + return "%dus" % (1000000*s) + + def render_rate(self, ctx, data): + # 21.8kBps, 554.4kBps 4.37MBps + if data is None: + return "" + r = float(data) + if r > 1000000: + return "%1.2fMBps" % (r/1000000) + if r > 1000: + return "%.1fkBps" % (r/1000) + return "%dBps" % r + + def _get_time(self, name): + d = self.download_results() + d.addCallback(lambda res: res.timings.get(name)) + return d + + def data_time_total(self, ctx, data): + return self._get_time("total") + + def data_time_peer_selection(self, ctx, data): + return self._get_time("peer_selection") + + def data_time_uri_extension(self, ctx, data): + return self._get_time("uri_extension") + + def data_time_hashtrees(self, ctx, data): + return self._get_time("hashtrees") + + def data_time_fetching(self, ctx, data): + return self._get_time("fetching") + + def data_time_cumulative_fetch(self, ctx, data): + return self._get_time("cumulative_fetch") + + def data_time_cumulative_decoding(self, ctx, data): + return self._get_time("cumulative_decoding") + + def _get_rate(self, name): + d = self.download_results() + def _convert(r): + file_size = r.file_size + time = r.timings.get(name) + if time is None: + return None + try: + return 1.0 * file_size / time + except ZeroDivisionError: + return None + d.addCallback(_convert) + return d + + def data_rate_total(self, ctx, data): + return self._get_rate("total") + + def data_rate_fetching(self, ctx, data): + return self._get_rate("fetching") + + def data_rate_decode(self, ctx, data): + return self._get_rate("cumulative_decoding") + + def data_rate_fetch(self, ctx, data): + return self._get_rate("cumulative_fetching") + +class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page): docFactory = getxmlfile("download-status.xhtml") + def __init__(self, data): + rend.Page.__init__(self, data) + self.download_status = data + + def download_results(self): + return defer.maybeDeferred(self.download_status.get_results) + + def render_results(self, ctx, data): + d = self.download_results() + def _got_results(results): + if results: + return ctx.tag + return "" + d.addCallback(_got_results) + return d + def render_si(self, ctx, data): si_s = base32.b2a_or_none(data.get_storage_index()) if si_s is None: -- 2.37.2