add upload timings and rates to the POST /uri?t=upload results page
authorBrian Warner <warner@allmydata.com>
Wed, 6 Feb 2008 07:41:51 +0000 (00:41 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 6 Feb 2008 07:41:51 +0000 (00:41 -0700)
src/allmydata/encode.py
src/allmydata/upload.py
src/allmydata/web/unlinked-upload.xhtml
src/allmydata/webish.py

index 66c05daab1591cb208627901d68b0d105eb80f99..39f1e904e06975d233aad9c66b5398375038642f 100644 (file)
@@ -1,5 +1,6 @@
 # -*- test-case-name: allmydata.test.test_encode -*-
 
+import time
 from zope.interface import implements
 from twisted.internet import defer
 from foolscap import eventual
@@ -207,6 +208,14 @@ class Encoder(object):
         # that we sent to that landlord.
         self.share_root_hashes = [None] * self.num_shares
 
+        self._times = {
+            "cumulative_encoding": 0.0,
+            "cumulative_sending": 0.0,
+            "hashes_and_close": 0.0,
+            "total_encode_and_push": 0.0,
+            }
+        self._start_total_timestamp = time.time()
+
         d = eventual.fireEventually()
 
         d.addCallback(lambda res: self.start_all_shareholders())
@@ -269,6 +278,7 @@ class Encoder(object):
 
     def _encode_segment(self, segnum):
         codec = self._codec
+        start = time.time()
 
         # the ICodecEncoder API wants to receive a total of self.segment_size
         # bytes on each encode() call, broken up into a number of
@@ -297,17 +307,23 @@ class Encoder(object):
 
         d = self._gather_data(self.required_shares, input_piece_size,
                               crypttext_segment_hasher)
-        def _done(chunks):
+        def _done_gathering(chunks):
             for c in chunks:
                 assert len(c) == input_piece_size
             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
             # during this call, we hit 5*segsize memory
             return codec.encode(chunks)
+        d.addCallback(_done_gathering)
+        def _done(res):
+            elapsed = time.time() - start
+            self._times["cumulative_encoding"] += elapsed
+            return res
         d.addCallback(_done)
         return d
 
     def _encode_tail_segment(self, segnum):
 
+        start = time.time()
         codec = self._tail_codec
         input_piece_size = codec.get_block_size()
 
@@ -316,13 +332,18 @@ class Encoder(object):
         d = self._gather_data(self.required_shares, input_piece_size,
                               crypttext_segment_hasher,
                               allow_short=True)
-        def _done(chunks):
+        def _done_gathering(chunks):
             for c in chunks:
                 # a short trailing chunk will have been padded by
                 # _gather_data
                 assert len(c) == input_piece_size
             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
             return codec.encode(chunks)
+        d.addCallback(_done_gathering)
+        def _done(res):
+            elapsed = time.time() - start
+            self._times["cumulative_encoding"] += elapsed
+            return res
         d.addCallback(_done)
         return d
 
@@ -386,6 +407,7 @@ class Encoder(object):
         # *doesn't* have a share, that's an error.
         _assert(set(self.landlords.keys()).issubset(set(shareids)),
                 shareids=shareids, landlords=self.landlords)
+        start = time.time()
         dl = []
         lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
         for i in range(len(shares)):
@@ -410,6 +432,8 @@ class Encoder(object):
                       100 * (segnum+1) / self.num_segments,
                       ),
                      level=log.OPERATIONAL)
+            elapsed = time.time() - start
+            self._times["cumulative_sending"] += elapsed
             return res
         dl.addCallback(_logit)
         return dl
@@ -463,6 +487,7 @@ class Encoder(object):
         return d
 
     def finish_hashing(self):
+        self._start_hashing_and_close_timestamp = time.time()
         crypttext_hash = self._crypttext_hasher.digest()
         self.uri_extension_data["crypttext_hash"] = crypttext_hash
         d = self._uploadable.get_plaintext_hash()
@@ -607,6 +632,14 @@ class Encoder(object):
 
     def done(self):
         self.log("upload done", level=log.OPERATIONAL)
+        now = time.time()
+        h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
+        self._times["hashes_and_close"] = h_and_c_elapsed
+        total_elapsed = now - self._start_total_timestamp
+        self._times["total_encode_and_push"] = total_elapsed
+
+        # update our sharemap
+        self._shares_placed = set(self.landlords.keys())
         return (self.uri_extension_hash, self.required_shares,
                 self.num_shares, self.file_size)
 
@@ -628,3 +661,18 @@ class Encoder(object):
             return f
         d.addCallback(_done)
         return d
+
+    def get_shares_placed(self):
+        # return a set of share numbers that were successfully placed.
+        return self._shares_placed
+
+    def get_times(self):
+        # return a dictionary of encode+push timings
+        return self._times
+    def get_rates(self):
+        # return a dictionary of encode+push speeds
+        rates = {
+            "encode": self.file_size / self._times["cumulative_encoding"],
+            "push": self.file_size / self._times["cumulative_sending"],
+            }
+        return rates
index 659fb1b2d0baf42aa37d1a62f0b969d1f0175c91..69a42c6047d74fcda91bd3f4e74b5aa9ef0a092c 100644 (file)
@@ -1,5 +1,5 @@
 
-import os
+import os, time
 from zope.interface import implements
 from twisted.python import failure
 from twisted.internet import defer
@@ -38,6 +38,12 @@ class TooFullError(Exception):
 
 class UploadResults:
     implements(IUploadResults)
+    uri = None
+    sharemap = None # dict of shnum to placement string
+    servermap = None # dict of peerid to set(shnums)
+    def __init__(self):
+        self.timings = {} # dict of name to number of seconds
+        self.rates = {} # dict of name to rates (in bytes per second)
 
 # our current uri_extension is 846 bytes for small files, a few bytes
 # more for larger ones (since the filesize is encoded in decimal in a
@@ -551,6 +557,7 @@ class CHKUploader:
         self._default_encoding_parameters = default_encoding_parameters
         self._log_number = self._client.log("CHKUploader starting")
         self._encoder = None
+        self._results = UploadResults()
 
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
@@ -565,6 +572,7 @@ class CHKUploader:
         This method returns a Deferred that will fire with the URI (a
         string)."""
 
+        self._started = time.time()
         uploadable = IUploadable(uploadable)
         self.log("starting upload of %s" % uploadable)
 
@@ -608,9 +616,14 @@ class CHKUploader:
         num_segments = encoder.get_param("num_segments")
         k,desired,n = encoder.get_param("share_counts")
 
+        self._peer_selection_started = time.time()
         d = peer_selector.get_shareholders(self._client, storage_index,
                                            share_size, block_size,
                                            num_segments, n, desired)
+        def _done(res):
+            self._peer_selection_finished = time.time()
+            return res
+        d.addCallback(_done)
         return d
 
     def set_shareholders(self, used_peers, encoder):
@@ -618,11 +631,14 @@ class CHKUploader:
         @param used_peers: a sequence of PeerTracker objects
         """
         self.log("_send_shares, used_peers is %s" % (used_peers,))
+        self._sharemap = {}
         for peer in used_peers:
             assert isinstance(peer, PeerTracker)
         buckets = {}
         for peer in used_peers:
             buckets.update(peer.buckets)
+            for shnum in peer.buckets:
+                self._sharemap[shnum] = peer
         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
         encoder.set_shareholders(buckets)
 
@@ -635,9 +651,27 @@ class CHKUploader:
                            total_shares=total_shares,
                            size=size,
                            )
-        results = UploadResults()
-        results.uri = u.to_string()
-        return results
+        r = self._results
+        r.uri = u.to_string()
+        r.sharemap = {}
+        r.servermap = {}
+        for shnum in self._encoder.get_shares_placed():
+            peer_tracker = self._sharemap[shnum]
+            peerid = peer_tracker.peerid
+            peerid_s = idlib.shortnodeid_b2a(peerid)
+            r.sharemap[shnum] = "Placed on [%s]" % peerid_s
+            if peerid not in r.servermap:
+                r.servermap[peerid] = set()
+            r.servermap[peerid].add(shnum)
+        peer_selection_time = (self._peer_selection_finished
+                               - self._peer_selection_started)
+        now = time.time()
+        r.timings["total"] = now - self._started
+        r.rates["total"] = 1.0 * self._encoder.file_size / r.timings["total"]
+        r.timings["peer_selection"] = peer_selection_time
+        r.timings.update(self._encoder.get_times())
+        r.rates.update(self._encoder.get_rates())
+        return r
 
 
 def read_this_many_bytes(uploadable, size, prepend_data=[]):
@@ -661,6 +695,7 @@ class LiteralUploader:
 
     def __init__(self, client):
         self._client = client
+        self._results = UploadResults()
 
     def set_params(self, encoding_parameters):
         pass
@@ -675,9 +710,8 @@ class LiteralUploader:
         return d
 
     def _build_results(self, uri):
-        results = UploadResults()
-        results.uri = uri
-        return results
+        self._results.uri = uri
+        return self._results
 
     def close(self):
         pass
@@ -760,6 +794,7 @@ class AssistedUploader:
         assert isinstance(default_encoding_parameters, dict)
         self._default_encoding_parameters = default_encoding_parameters
         self._log_number = log.msg("AssistedUploader starting")
+        self._results = UploadResults()
 
     def log(self, msg, parent=None, **kwargs):
         if parent is None:
@@ -767,6 +802,7 @@ class AssistedUploader:
         return log.msg(msg, parent=parent, **kwargs)
 
     def start(self, uploadable):
+        self._started = time.time()
         u = IUploadable(uploadable)
         eu = EncryptAnUploadable(u, self._default_encoding_parameters)
         self._encuploadable = eu
@@ -802,11 +838,16 @@ class AssistedUploader:
         self._storage_index = storage_index
 
     def _contact_helper(self, res):
+        now = self._time_contacting_helper = time.time()
+        self._results.timings["local_hashing"] = now - self._started
         self.log("contacting helper..")
         d = self._helper.callRemote("upload_chk", self._storage_index)
         d.addCallback(self._contacted_helper)
         return d
     def _contacted_helper(self, (upload_results, upload_helper)):
+        now = time.time()
+        elapsed = now - self._time_contacting_helper
+        self._results.timings["contacting_helper"] = elapsed
         if upload_helper:
             self.log("helper says we need to upload")
             # we need to upload the file
@@ -849,9 +890,12 @@ class AssistedUploader:
                            total_shares=self._total_shares,
                            size=self._size,
                            )
-        results = UploadResults()
-        results.uri = u.to_string()
-        return results
+        r = self._results
+        r.uri = u.to_string()
+        now = time.time()
+        r.timings["total"] = now - self._started
+        r.rates["total"] = 1.0 * self._size / r.timings["total"]
+        return r
 
 class NoParameterPreferencesMixin:
     max_segment_size = None
index 0de727b3e22c7874c87c869404745a4841a6dd00..6f9149d3768f9c0b39c4bb52aa51b72e67a6b061 100644 (file)
 <ul>
   <li>URI: <tt><span n:render="string" n:data="uri" /></tt></li>
   <li>Download link: <span n:render="download_link" /></li>
+  <li>Sharemap: <span n:render="sharemap" /></li>
+  <li>Servermap: <span n:render="servermap" /></li>
+  <li>Timings:</li>
+  <ul>
+    <li>Total: <span n:render="time" n:data="time_total" />
+     (<span n:render="rate" n:data="rate_total" />)</li>
+    <ul>
+      <li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
+      <li>Encode And Push: <span n:render="time" n:data="time_total_encode_and_push" /></li>
+      <ul>
+        <li>Cumulative Encoding: <span n:render="time" n:data="time_cumulative_encoding" />
+        (<span n:render="rate" n:data="rate_encode" />)</li>
+        <li>Cumulative Pushing: <span n:render="time" n:data="time_cumulative_sending" />
+        (<span n:render="rate" n:data="rate_push" />)</li>
+        <li>Send Hashes And Close: <span n:render="time" n:data="time_hashes_and_close" /></li>
+      </ul>
+    </ul>
+  </ul>
 </ul>
 
 <div>Return to the <a href="/">Welcome Page</a></div>
index e5e824ef6fd6f44ceb8ca089f10f6c2d6a858193..e5b8126a0042726a35db6a544b1bb50e604ec448 100644 (file)
@@ -1275,6 +1275,104 @@ class UnlinkedPOSTCHKUploader(rend.Page):
                       ["/uri/" + res.uri])
         return d
 
+    def render_sharemap(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.sharemap)
+        def _render(sharemap):
+            if sharemap is None:
+                return "None"
+            l = T.ul()
+            for shnum in sorted(sharemap.keys()):
+                l[T.li["%d -> %s" % (shnum, sharemap[shnum])]]
+            return l
+        d.addCallback(_render)
+        return d
+
+    def render_servermap(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.servermap)
+        def _render(servermap):
+            if servermap is None:
+                return "None"
+            l = T.ul()
+            for peerid in sorted(servermap.keys()):
+                peerid_s = idlib.shortnodeid_b2a(peerid)
+                shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
+                l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
+            return l
+        d.addCallback(_render)
+        return d
+
+    def render_time(self, ctx, data):
+        # 1.23s, 790ms, 132us
+        if data is None:
+            return ""
+        s = float(data)
+        if s >= 1.0:
+            return "%.2fs" % s
+        if s >= 0.01:
+            return "%dms" % (1000*s)
+        if s >= 0.001:
+            return "%.1fms" % (1000*s)
+        return "%dus" % (1000000*s)
+
+    def render_rate(self, ctx, data):
+        # 21.8kBps, 554.4kBps 4.37MBps
+        if data is None:
+            return ""
+        r = float(data)
+        if r > 1000000:
+            return "%1.2fMBps" % (r/1000000)
+        if r > 1000:
+            return "%.1fkBps" % (r/1000)
+        return "%dBps" % r
+
+    def data_time_total(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.timings.get("total"))
+        return d
+
+    def data_time_peer_selection(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.timings.get("peer_selection"))
+        return d
+
+    def data_time_total_encode_and_push(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.timings.get("total_encode_and_push"))
+        return d
+
+    def data_time_cumulative_encoding(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.timings.get("cumulative_encoding"))
+        return d
+
+    def data_time_cumulative_sending(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.timings.get("cumulative_sending"))
+        return d
+
+    def data_time_hashes_and_close(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.timings.get("hashes_and_close"))
+        return d
+
+    def data_rate_total(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.rates.get("total"))
+        return d
+
+    def data_rate_encode(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.rates.get("encode"))
+        return d
+
+    def data_rate_push(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.rates.get("push"))
+        return d
+
+
 class UnlinkedPOSTSSKUploader(rend.Page):
     def renderHTTP(self, ctx):
         req = inevow.IRequest(ctx)