From def910c39125c6eb345a1e70011f558f2222ff1c Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 3 Mar 2008 20:09:32 -0700 Subject: [PATCH] webish download results: add servermap, decrypt time --- src/allmydata/download.py | 57 +++++++++++++++++++++++-- src/allmydata/interfaces.py | 13 +++--- src/allmydata/web/download-status.xhtml | 8 ++-- src/allmydata/webish.py | 43 +++++++++++++------ 4 files changed, 96 insertions(+), 25 deletions(-) diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 87b039f2..91c163db 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -422,6 +422,10 @@ class FileDownloader: self._results = DownloadResults() s.set_results(self._results) self._results.file_size = self._size + self._results.timings["servers_peer_selection"] = {} + self._results.timings["cumulative_fetch"] = 0.0 + self._results.timings["cumulative_decode"] = 0.0 + self._results.timings["cumulative_decrypt"] = 0.0 if IConsumer.providedBy(downloadable): downloadable.registerProducer(self, True) @@ -483,8 +487,6 @@ class FileDownloader: def start(self): self.log("starting download") - if self._results: - self._results.timings["servers_peer_selection"] = {} # first step: who should we download from? d = defer.maybeDeferred(self._get_all_shareholders) d.addCallback(self._got_all_shareholders) @@ -499,6 +501,7 @@ class FileDownloader: if self._status: self._status.set_status("Finished") self._status.set_active(False) + self._status.set_paused(False) if IConsumer.providedBy(self._downloadable): self._downloadable.unregisterProducer() return res @@ -542,6 +545,10 @@ class FileDownloader: b = storage.ReadBucketProxy(bucket, peerid, self._si_s) self.add_share_bucket(sharenum, b) self._uri_extension_sources.append(b) + if self._results: + if peerid not in self._results.servermap: + self._results.servermap[peerid] = set() + self._results.servermap[peerid].add(sharenum) def add_share_bucket(self, sharenum, bucket): # this is split out for the benefit of test_encode.py @@ -785,15 +792,33 @@ class FileDownloader: # memory footprint: when the SegmentDownloader finishes pulling down # all shares, we have 1*segment_size of usage. segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares) + started = time.time() d = segmentdler.start() + def _finished_fetching(res): + elapsed = time.time() - started + self._results.timings["cumulative_fetch"] += elapsed + return res + if self._results: + d.addCallback(_finished_fetching) # pause before using more memory d.addCallback(self._check_for_pause) # while the codec does its job, we hit 2*segment_size + def _started_decode(res): + self._started_decode = time.time() + return res + if self._results: + d.addCallback(_started_decode) d.addCallback(lambda (shares, shareids): self._codec.decode(shares, shareids)) # once the codec is done, we drop back to 1*segment_size, because # 'shares' goes out of scope. The memory usage is all in the # plaintext now, spread out into a bunch of tiny buffers. + def _finished_decode(res): + elapsed = time.time() - self._started_decode + self._results.timings["cumulative_decode"] += elapsed + return res + if self._results: + d.addCallback(_finished_decode) # pause/check-for-stop just before writing, to honor stopProducing d.addCallback(self._check_for_pause) @@ -808,7 +833,11 @@ class FileDownloader: # we're down to 1*segment_size right now, but write_segment() # will decrypt a copy of the segment internally, which will push # us up to 2*segment_size while it runs. + started_decrypt = time.time() self._output.write_segment(segment) + if self._results: + elapsed = time.time() - started_decrypt + self._results.timings["cumulative_decrypt"] += elapsed d.addCallback(_done) return d @@ -817,11 +846,29 @@ class FileDownloader: % (segnum, self._total_segments, 100.0 * segnum / self._total_segments)) segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares) + started = time.time() d = segmentdler.start() + def _finished_fetching(res): + elapsed = time.time() - started + self._results.timings["cumulative_fetch"] += elapsed + return res + if self._results: + d.addCallback(_finished_fetching) # pause before using more memory d.addCallback(self._check_for_pause) + def _started_decode(res): + self._started_decode = time.time() + return res + if self._results: + d.addCallback(_started_decode) d.addCallback(lambda (shares, shareids): self._tail_codec.decode(shares, shareids)) + def _finished_decode(res): + elapsed = time.time() - self._started_decode + self._results.timings["cumulative_decode"] += elapsed + return res + if self._results: + d.addCallback(_finished_decode) # pause/check-for-stop just before writing, to honor stopProducing d.addCallback(self._check_for_pause) def _done(buffers): @@ -833,7 +880,11 @@ class FileDownloader: pad_size = mathutil.pad_size(self._size, self._segment_size) tail_size = self._segment_size - pad_size segment = segment[:tail_size] + started_decrypt = time.time() self._output.write_segment(segment) + if self._results: + elapsed = time.time() - started_decrypt + self._results.timings["cumulative_decrypt"] += elapsed d.addCallback(_done) return d @@ -842,7 +893,7 @@ class FileDownloader: if self._results: now = time.time() self._results.timings["total"] = now - self._started - self._results.timings["fetching"] = now - self._started_fetching + self._results.timings["segments"] = now - self._started_fetching self._output.close() if self.check_crypttext_hash: _assert(self._crypttext_hash == self._output.crypttext_hash, diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 83604896..3bcb907d 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1307,21 +1307,22 @@ class IDownloadResults(Interface): """I am created internally by download() methods. I contain a number of public attributes which contain details about the download process.:: - .file_size : the size of the file, in bytes + .file_size : the size of the file, in bytes .servers_used : set of server peerids that were used during download .server_problems : dict mapping server peerid to a problem string. Only servers that had problems (bad hashes, disconnects) are listed here. - .servermap : dict mapping server peerid to a set of share numbers. Only - servers that had any shares are listed here. + .servermap : dict mapping server peerid to a set of share numbers. Only + servers that had any shares are listed here. .timings : dict of timing information, mapping name to seconds (float) peer_selection : time to ask servers about shares servers_peer_selection : dict of peerid to DYHB-query time uri_extension : time to fetch a copy of the URI extension block hashtrees : time to fetch the hash trees - fetching : time to fetch, decode, and deliver segments - cumulative_fetching : time spent waiting for storage servers - cumulative_decoding : just time spent in zfec + segments : time to fetch, decode, and deliver segments + cumulative_fetch : time spent waiting for storage servers + cumulative_decode : just time spent in zfec + cumulative_decrypt : just time spent in decryption total : total download time, start to finish servers_fetching : dict of peerid to list of per-segment fetch times diff --git a/src/allmydata/web/download-status.xhtml b/src/allmydata/web/download-status.xhtml index b773fb5e..d60e7efc 100644 --- a/src/allmydata/web/download-status.xhtml +++ b/src/allmydata/web/download-status.xhtml @@ -33,13 +33,15 @@
  • Peer Selection:
  • UEB Fetch:
  • Hashtree Fetch:
  • -
  • Segment Fetch: - ()
  • +
  • Segment Fetch: + ()
  • diff --git a/src/allmydata/webish.py b/src/allmydata/webish.py index 59b9db6c..db65d935 100644 --- a/src/allmydata/webish.py +++ b/src/allmydata/webish.py @@ -1372,6 +1372,11 @@ class UnlinkedPUTCreateDirectory(rend.Page): # XXX add redirect_to_result return d +def plural(sequence): + if len(sequence) == 1: + return "" + return "s" + class UploadResultsRendererMixin: # this requires a method named 'upload_results' @@ -1397,8 +1402,11 @@ class UploadResultsRendererMixin: l = T.ul() for peerid in sorted(servermap.keys()): peerid_s = idlib.shortnodeid_b2a(peerid) - shares_s = ",".join([str(shnum) for shnum in servermap[peerid]]) - l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]] + shares_s = ",".join(["#%d" % shnum + for shnum in servermap[peerid]]) + l[T.li["[%s] got share%s: %s" % (peerid_s, + plural(servermap[peerid]), + shares_s)]] return l d.addCallback(_render) return d @@ -1678,8 +1686,11 @@ class DownloadResultsRendererMixin: l = T.ul() for peerid in sorted(servermap.keys()): peerid_s = idlib.shortnodeid_b2a(peerid) - shares_s = ",".join([str(shnum) for shnum in servermap[peerid]]) - l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]] + shares_s = ",".join(["#%d" % shnum + for shnum in servermap[peerid]]) + l[T.li["[%s] has share%s: %s" % (peerid_s, + plural(servermap[peerid]), + shares_s)]] return l d.addCallback(_render) return d @@ -1730,14 +1741,17 @@ class DownloadResultsRendererMixin: def data_time_hashtrees(self, ctx, data): return self._get_time("hashtrees") - def data_time_fetching(self, ctx, data): - return self._get_time("fetching") + def data_time_segments(self, ctx, data): + return self._get_time("segments") def data_time_cumulative_fetch(self, ctx, data): return self._get_time("cumulative_fetch") - def data_time_cumulative_decoding(self, ctx, data): - return self._get_time("cumulative_decoding") + def data_time_cumulative_decode(self, ctx, data): + return self._get_time("cumulative_decode") + + def data_time_cumulative_decrypt(self, ctx, data): + return self._get_time("cumulative_decrypt") def _get_rate(self, name): d = self.download_results() @@ -1756,14 +1770,17 @@ class DownloadResultsRendererMixin: def data_rate_total(self, ctx, data): return self._get_rate("total") - def data_rate_fetching(self, ctx, data): - return self._get_rate("fetching") + def data_rate_segments(self, ctx, data): + return self._get_rate("segments") + + def data_rate_fetch(self, ctx, data): + return self._get_rate("cumulative_fetch") def data_rate_decode(self, ctx, data): - return self._get_rate("cumulative_decoding") + return self._get_rate("cumulative_decode") - def data_rate_fetch(self, ctx, data): - return self._get_rate("cumulative_fetching") + def data_rate_decrypt(self, ctx, data): + return self._get_rate("cumulative_decrypt") class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page): docFactory = getxmlfile("download-status.xhtml") -- 2.45.2