]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
webish: add 'download results', with some basic timing information
authorBrian Warner <warner@allmydata.com>
Tue, 4 Mar 2008 02:19:21 +0000 (19:19 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 4 Mar 2008 02:19:21 +0000 (19:19 -0700)
src/allmydata/checker.py
src/allmydata/download.py
src/allmydata/interfaces.py
src/allmydata/offloaded.py
src/allmydata/storage.py
src/allmydata/web/download-status.xhtml
src/allmydata/webish.py

index b0855817df47384de1a58486c1bcb68ae1c24251..3ac3e11f8690e8a73b9b11f1452ac71fb3533b19 100644 (file)
@@ -122,6 +122,7 @@ class SimpleCHKFileVerifier(download.FileDownloader):
         self._paused = False
         self._stopped = False
 
+        self._results = None
         self.active_buckets = {} # k: shnum, v: bucket
         self._share_buckets = [] # list of (sharenum, bucket) tuples
         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
index 907f9387ebb42011af552feb4d3558706279e0a2..87b039f273bf7beba85fc6ea5071e99fb1e63544 100644 (file)
@@ -1,16 +1,16 @@
 
-import os, random, weakref, itertools
+import os, random, weakref, itertools, time
 from zope.interface import implements
 from twisted.internet import defer
 from twisted.internet.interfaces import IPushProducer, IConsumer
 from twisted.application import service
 from foolscap.eventual import eventually
 
-from allmydata.util import base32, mathutil, hashutil, log, idlib
+from allmydata.util import base32, mathutil, hashutil, log
 from allmydata.util.assertutil import _assert
 from allmydata import codec, hashtree, storage, uri
 from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
-     IDownloadStatus
+     IDownloadStatus, IDownloadResults
 from allmydata.encode import NotEnoughPeersError
 from pycryptopp.cipher.aes import AES
 
@@ -28,6 +28,16 @@ class BadCrypttextHashValue(Exception):
 class DownloadStopped(Exception):
     pass
 
+class DownloadResults:
+    implements(IDownloadResults)
+
+    def __init__(self):
+        self.servers_used = set()
+        self.server_problems = {}
+        self.servermap = {}
+        self.timings = {}
+        self.file_size = None
+
 class Output:
     def __init__(self, downloadable, key, total_length, log_parent,
                  download_status):
@@ -338,6 +348,7 @@ class DownloadStatus:
         self.paused = False
         self.stopped = False
         self.active = True
+        self.results = None
         self.counter = self.statusid_counter.next()
 
     def get_storage_index(self):
@@ -357,6 +368,8 @@ class DownloadStatus:
         return self.progress
     def get_active(self):
         return self.active
+    def get_results(self):
+        return self.results
     def get_counter(self):
         return self.counter
 
@@ -376,6 +389,8 @@ class DownloadStatus:
         self.progress = value
     def set_active(self, value):
         self.active = value
+    def set_results(self, value):
+        self.results = value
 
 class FileDownloader:
     implements(IPushProducer)
@@ -396,6 +411,7 @@ class FileDownloader:
         self._si_s = storage.si_b2a(self._storage_index)
         self.init_logging()
 
+        self._started = time.time()
         self._status = s = DownloadStatus()
         s.set_status("Starting")
         s.set_storage_index(self._storage_index)
@@ -403,6 +419,10 @@ class FileDownloader:
         s.set_helper(False)
         s.set_active(True)
 
+        self._results = DownloadResults()
+        s.set_results(self._results)
+        self._results.file_size = self._size
+
         if IConsumer.providedBy(downloadable):
             downloadable.registerProducer(self, True)
         self._downloadable = downloadable
@@ -463,6 +483,8 @@ class FileDownloader:
     def start(self):
         self.log("starting download")
 
+        if self._results:
+            self._results.timings["servers_peer_selection"] = {}
         # first step: who should we download from?
         d = defer.maybeDeferred(self._get_all_shareholders)
         d.addCallback(self._got_all_shareholders)
@@ -495,10 +517,9 @@ class FileDownloader:
         dl = []
         for (peerid,ss) in self._client.get_permuted_peers("storage",
                                                            self._storage_index):
-            peerid_s = idlib.shortnodeid_b2a(peerid)
             d = ss.callRemote("get_buckets", self._storage_index)
             d.addCallbacks(self._got_response, self._got_error,
-                           callbackArgs=(peerid_s,))
+                           callbackArgs=(peerid,))
             dl.append(d)
         self._responses_received = 0
         self._queries_sent = len(dl)
@@ -508,14 +529,17 @@ class FileDownloader:
                                      self._queries_sent))
         return defer.DeferredList(dl)
 
-    def _got_response(self, buckets, peerid_s):
+    def _got_response(self, buckets, peerid):
         self._responses_received += 1
+        if self._results:
+            elapsed = time.time() - self._started
+            self._results.timings["servers_peer_selection"][peerid] = elapsed
         if self._status:
             self._status.set_status("Locating Shares (%d/%d)" %
                                     (self._responses_received,
                                      self._queries_sent))
         for sharenum, bucket in buckets.iteritems():
-            b = storage.ReadBucketProxy(bucket, peerid_s, self._si_s)
+            b = storage.ReadBucketProxy(bucket, peerid, self._si_s)
             self.add_share_bucket(sharenum, b)
             self._uri_extension_sources.append(b)
 
@@ -539,6 +563,10 @@ class FileDownloader:
             del self._share_vbuckets[shnum]
 
     def _got_all_shareholders(self, res):
+        if self._results:
+            now = time.time()
+            self._results.timings["peer_selection"] = now - self._started
+
         if len(self._share_buckets) < self._num_needed_shares:
             raise NotEnoughPeersError
 
@@ -558,6 +586,7 @@ class FileDownloader:
         if self._status:
             self._status.set_status("Obtaining URI Extension")
 
+        self._uri_extension_fetch_started = time.time()
         def _validate(proposal, bucket):
             h = hashutil.uri_extension_hash(proposal)
             if h != self._uri_extension_hash:
@@ -599,6 +628,10 @@ class FileDownloader:
         return d
 
     def _got_uri_extension(self, uri_extension_data):
+        if self._results:
+            elapsed = time.time() - self._uri_extension_fetch_started
+            self._results.timings["uri_extension"] = elapsed
+
         d = self._uri_extension_data = uri_extension_data
 
         self._codec = codec.get_decoder_by_name(d['codec_name'])
@@ -621,6 +654,7 @@ class FileDownloader:
         self._share_hashtree.set_hashes({0: self._roothash})
 
     def _get_hashtrees(self, res):
+        self._get_hashtrees_started = time.time()
         if self._status:
             self._status.set_status("Retrieving Hash Trees")
         d = self._get_plaintext_hashtrees()
@@ -679,7 +713,9 @@ class FileDownloader:
     def _setup_hashtrees(self, res):
         self._output.setup_hashtrees(self._plaintext_hashtree,
                                      self._crypttext_hashtree)
-
+        if self._results:
+            elapsed = time.time() - self._get_hashtrees_started
+            self._results.timings["hashtrees"] = elapsed
 
     def _create_validated_buckets(self, ignored=None):
         self._share_vbuckets = {}
@@ -719,6 +755,8 @@ class FileDownloader:
         # RIBucketReader references.
         self.active_buckets = {} # k: shnum, v: ValidatedBucket instance
 
+        self._started_fetching = time.time()
+
         d = defer.succeed(None)
         for segnum in range(self._total_segments-1):
             d.addCallback(self._download_segment, segnum)
@@ -801,6 +839,10 @@ class FileDownloader:
 
     def _done(self, res):
         self.log("download done")
+        if self._results:
+            now = time.time()
+            self._results.timings["total"] = now - self._started
+            self._results.timings["fetching"] = now - self._started_fetching
         self._output.close()
         if self.check_crypttext_hash:
             _assert(self._crypttext_hash == self._output.crypttext_hash,
index 2d7f71edea2fc3c43fdc8e98c47ad6cf41411223..83604896865e41751defab87486cf96b315f62c4 100644 (file)
@@ -1278,9 +1278,52 @@ class IUploadable(Interface):
 
 class IUploadResults(Interface):
     """I am returned by upload() methods. I contain a number of public
-    attributes which can be read to determine the results of the upload::
+    attributes which can be read to determine the results of the upload. Some
+    of these are functional, some are timing information. All of these may be
+    None.::
 
+     .file_size : the size of the file, in bytes
      .uri : the CHK read-cap for the file
+     .ciphertext_fetched : how many bytes were fetched by the helper
+     .sharemap : dict mapping share number to placement string
+     .servermap : dict mapping server peerid to a set of share numbers
+     .timings : dict of timing information, mapping name to seconds (float)
+       total : total upload time, start to finish
+       storage_index : time to compute the storage index
+       peer_selection : time to decide which peers will be used
+       contacting_helper : initial helper query to upload/no-upload decision
+       existence_check : helper pre-upload existence check
+       helper_total : initial helper query to helper finished pushing
+       cumulative_fetch : helper waiting for ciphertext requests
+       total_fetch : helper start to last ciphertext response
+       cumulative_encoding : just time spent in zfec
+       cumulative_sending : just time spent waiting for storage servers
+       hashes_and_close : last segment push to shareholder close
+       total_encode_and_push : first encode to shareholder close
+
+    """
+
+class IDownloadResults(Interface):
+    """I am created internally by download() methods. I contain a number of
+    public attributes which contain details about the download process.::
+
+      .file_size : the size of the file, in bytes
+      .servers_used : set of server peerids that were used during download
+      .server_problems : dict mapping server peerid to a problem string. Only
+                         servers that had problems (bad hashes, disconnects) are
+                         listed here.
+      .servermap : dict mapping server peerid to a set of share numbers. Only
+                   servers that had any shares are listed here.
+     .timings : dict of timing information, mapping name to seconds (float)
+       peer_selection : time to ask servers about shares
+       servers_peer_selection : dict of peerid to DYHB-query time
+       uri_extension : time to fetch a copy of the URI extension block
+       hashtrees : time to fetch the hash trees
+       fetching : time to fetch, decode, and deliver segments
+        cumulative_fetching : time spent waiting for storage servers
+        cumulative_decoding : just time spent in zfec
+       total : total download time, start to finish
+        servers_fetching : dict of peerid to list of per-segment fetch times
 
     """
 
index f95e33e74ee16b0379695083f9c5f55ef093ba58..78e489cde411a1e03070da85036283513be4e5ae 100644 (file)
@@ -84,7 +84,7 @@ class CHKCheckerAndUEBFetcher:
             self.log("no readers, so no UEB", level=log.NOISY)
             return
         b,peerid = self._readers.pop()
-        rbp = storage.ReadBucketProxy(b, idlib.shortnodeid_b2a(peerid),
+        rbp = storage.ReadBucketProxy(b, peerid,
                                       storage.si_b2a(self._storage_index))
         d = rbp.startIfNecessary()
         d.addCallback(lambda res: rbp.get_uri_extension())
index a812906a1155c20d62e235eabe2be74e46caa01b..f3705181291e65c881b803a4562959e618f4752c 100644 (file)
@@ -1201,14 +1201,15 @@ class WriteBucketProxy:
 
 class ReadBucketProxy:
     implements(IStorageBucketReader)
-    def __init__(self, rref, peerid_s=None, storage_index_s=None):
+    def __init__(self, rref, peerid=None, storage_index_s=None):
         self._rref = rref
-        self._peerid_s = peerid_s
+        self._peerid = peerid
         self._si_s = storage_index_s
         self._started = False
 
     def __repr__(self):
-        return "<ReadBucketProxy to peer [%s] SI %s>" % (self._peerid_s,
+        peerid_s = idlib.shortnodeid_b2a(self._peerid)
+        return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
                                                          self._si_s)
 
     def startIfNecessary(self):
index b89a68b71d9ba696b6a87b244d8983d4434fab61..b773fb5e56126936ce47bfb932fe48f0e962c699 100644 (file)
   <li>Status: <span n:render="status"/></li>
 </ul>
 
+
+<div n:render="results">
+  <h2>Download Results</h2>
+  <ul>
+    <li>Servers Used: <span n:render="servers_used" /></li>
+    <li>Servermap: <span n:render="servermap" /></li>
+    <li>Timings:</li>
+    <ul>
+      <li>File Size: <span n:render="string" n:data="file_size" /> bytes</li>
+      <li>Total: <span n:render="time" n:data="time_total" />
+      (<span n:render="rate" n:data="rate_total" />)</li>
+      <ul>
+        <li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
+        <li>UEB Fetch: <span n:render="time" n:data="time_uri_extension" /></li>
+        <li>Hashtree Fetch: <span n:render="time" n:data="time_hashtrees" /></li>
+        <li>Segment Fetch: <span n:render="time" n:data="time_fetching" />
+        (<span n:render="rate" n:data="rate_fetching" />)</li>
+        <ul>
+          <li>Cumulative Fetching: <span n:render="time" n:data="time_cumulative_fetch" />
+          (<span n:render="rate" n:data="rate_fetch" />)</li>
+          <li>Cumulative Decoding: <span n:render="time" n:data="time_cumulative_decoding" />
+          (<span n:render="rate" n:data="rate_decode" />)</li>
+        </ul>
+      </ul>
+    </ul>
+  </ul>
+</div>
+
 <div>Return to the <a href="/">Welcome Page</a></div>
 
   </body>
index 427e7ef72b7bd0bcee68f8cdb79f4d2febb7faea..59b9db6cc3af6498308ddf69d90c62150d46e44d 100644 (file)
@@ -1663,9 +1663,127 @@ class UploadStatusPage(UploadResultsRendererMixin, rend.Page):
     def render_status(self, ctx, data):
         return data.get_status()
 
-class DownloadStatusPage(rend.Page):
+class DownloadResultsRendererMixin:
+    # this requires a method named 'download_results'
+
+    def render_servers_used(self, ctx, data):
+        return "nope"
+
+    def render_servermap(self, ctx, data):
+        d = self.download_results()
+        d.addCallback(lambda res: res.servermap)
+        def _render(servermap):
+            if servermap is None:
+                return "None"
+            l = T.ul()
+            for peerid in sorted(servermap.keys()):
+                peerid_s = idlib.shortnodeid_b2a(peerid)
+                shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
+                l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
+            return l
+        d.addCallback(_render)
+        return d
+
+    def data_file_size(self, ctx, data):
+        d = self.download_results()
+        d.addCallback(lambda res: res.file_size)
+        return d
+
+    def render_time(self, ctx, data):
+        # 1.23s, 790ms, 132us
+        if data is None:
+            return ""
+        s = float(data)
+        if s >= 1.0:
+            return "%.2fs" % s
+        if s >= 0.01:
+            return "%dms" % (1000*s)
+        if s >= 0.001:
+            return "%.1fms" % (1000*s)
+        return "%dus" % (1000000*s)
+
+    def render_rate(self, ctx, data):
+        # 21.8kBps, 554.4kBps 4.37MBps
+        if data is None:
+            return ""
+        r = float(data)
+        if r > 1000000:
+            return "%1.2fMBps" % (r/1000000)
+        if r > 1000:
+            return "%.1fkBps" % (r/1000)
+        return "%dBps" % r
+
+    def _get_time(self, name):
+        d = self.download_results()
+        d.addCallback(lambda res: res.timings.get(name))
+        return d
+
+    def data_time_total(self, ctx, data):
+        return self._get_time("total")
+
+    def data_time_peer_selection(self, ctx, data):
+        return self._get_time("peer_selection")
+
+    def data_time_uri_extension(self, ctx, data):
+        return self._get_time("uri_extension")
+
+    def data_time_hashtrees(self, ctx, data):
+        return self._get_time("hashtrees")
+
+    def data_time_fetching(self, ctx, data):
+        return self._get_time("fetching")
+
+    def data_time_cumulative_fetch(self, ctx, data):
+        return self._get_time("cumulative_fetch")
+
+    def data_time_cumulative_decoding(self, ctx, data):
+        return self._get_time("cumulative_decoding")
+
+    def _get_rate(self, name):
+        d = self.download_results()
+        def _convert(r):
+            file_size = r.file_size
+            time = r.timings.get(name)
+            if time is None:
+                return None
+            try:
+                return 1.0 * file_size / time
+            except ZeroDivisionError:
+                return None
+        d.addCallback(_convert)
+        return d
+
+    def data_rate_total(self, ctx, data):
+        return self._get_rate("total")
+
+    def data_rate_fetching(self, ctx, data):
+        return self._get_rate("fetching")
+
+    def data_rate_decode(self, ctx, data):
+        return self._get_rate("cumulative_decoding")
+
+    def data_rate_fetch(self, ctx, data):
+        return self._get_rate("cumulative_fetching")
+
+class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
     docFactory = getxmlfile("download-status.xhtml")
 
+    def __init__(self, data):
+        rend.Page.__init__(self, data)
+        self.download_status = data
+
+    def download_results(self):
+        return defer.maybeDeferred(self.download_status.get_results)
+
+    def render_results(self, ctx, data):
+        d = self.download_results()
+        def _got_results(results):
+            if results:
+                return ctx.tag
+            return ""
+        d.addCallback(_got_results)
+        return d
+
     def render_si(self, ctx, data):
         si_s = base32.b2a_or_none(data.get_storage_index())
         if si_s is None: