From: Brian Warner Date: Wed, 6 Feb 2008 07:41:51 +0000 (-0700) Subject: add upload timings and rates to the POST /uri?t=upload results page X-Git-Tag: allmydata-tahoe-0.8.0~132 X-Git-Url: https://git.rkrishnan.org/pf/using.html?a=commitdiff_plain;h=93d45abb023d5e2e6e3538142f54fbdb3c74e99f;p=tahoe-lafs%2Ftahoe-lafs.git add upload timings and rates to the POST /uri?t=upload results page --- diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index 66c05daa..39f1e904 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -1,5 +1,6 @@ # -*- test-case-name: allmydata.test.test_encode -*- +import time from zope.interface import implements from twisted.internet import defer from foolscap import eventual @@ -207,6 +208,14 @@ class Encoder(object): # that we sent to that landlord. self.share_root_hashes = [None] * self.num_shares + self._times = { + "cumulative_encoding": 0.0, + "cumulative_sending": 0.0, + "hashes_and_close": 0.0, + "total_encode_and_push": 0.0, + } + self._start_total_timestamp = time.time() + d = eventual.fireEventually() d.addCallback(lambda res: self.start_all_shareholders()) @@ -269,6 +278,7 @@ class Encoder(object): def _encode_segment(self, segnum): codec = self._codec + start = time.time() # the ICodecEncoder API wants to receive a total of self.segment_size # bytes on each encode() call, broken up into a number of @@ -297,17 +307,23 @@ class Encoder(object): d = self._gather_data(self.required_shares, input_piece_size, crypttext_segment_hasher) - def _done(chunks): + def _done_gathering(chunks): for c in chunks: assert len(c) == input_piece_size self._crypttext_hashes.append(crypttext_segment_hasher.digest()) # during this call, we hit 5*segsize memory return codec.encode(chunks) + d.addCallback(_done_gathering) + def _done(res): + elapsed = time.time() - start + self._times["cumulative_encoding"] += elapsed + return res d.addCallback(_done) return d def _encode_tail_segment(self, segnum): + start = time.time() codec = self._tail_codec input_piece_size = codec.get_block_size() @@ -316,13 +332,18 @@ class Encoder(object): d = self._gather_data(self.required_shares, input_piece_size, crypttext_segment_hasher, allow_short=True) - def _done(chunks): + def _done_gathering(chunks): for c in chunks: # a short trailing chunk will have been padded by # _gather_data assert len(c) == input_piece_size self._crypttext_hashes.append(crypttext_segment_hasher.digest()) return codec.encode(chunks) + d.addCallback(_done_gathering) + def _done(res): + elapsed = time.time() - start + self._times["cumulative_encoding"] += elapsed + return res d.addCallback(_done) return d @@ -386,6 +407,7 @@ class Encoder(object): # *doesn't* have a share, that's an error. _assert(set(self.landlords.keys()).issubset(set(shareids)), shareids=shareids, landlords=self.landlords) + start = time.time() dl = [] lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY) for i in range(len(shares)): @@ -410,6 +432,8 @@ class Encoder(object): 100 * (segnum+1) / self.num_segments, ), level=log.OPERATIONAL) + elapsed = time.time() - start + self._times["cumulative_sending"] += elapsed return res dl.addCallback(_logit) return dl @@ -463,6 +487,7 @@ class Encoder(object): return d def finish_hashing(self): + self._start_hashing_and_close_timestamp = time.time() crypttext_hash = self._crypttext_hasher.digest() self.uri_extension_data["crypttext_hash"] = crypttext_hash d = self._uploadable.get_plaintext_hash() @@ -607,6 +632,14 @@ class Encoder(object): def done(self): self.log("upload done", level=log.OPERATIONAL) + now = time.time() + h_and_c_elapsed = now - self._start_hashing_and_close_timestamp + self._times["hashes_and_close"] = h_and_c_elapsed + total_elapsed = now - self._start_total_timestamp + self._times["total_encode_and_push"] = total_elapsed + + # update our sharemap + self._shares_placed = set(self.landlords.keys()) return (self.uri_extension_hash, self.required_shares, self.num_shares, self.file_size) @@ -628,3 +661,18 @@ class Encoder(object): return f d.addCallback(_done) return d + + def get_shares_placed(self): + # return a set of share numbers that were successfully placed. + return self._shares_placed + + def get_times(self): + # return a dictionary of encode+push timings + return self._times + def get_rates(self): + # return a dictionary of encode+push speeds + rates = { + "encode": self.file_size / self._times["cumulative_encoding"], + "push": self.file_size / self._times["cumulative_sending"], + } + return rates diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 659fb1b2..69a42c60 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -1,5 +1,5 @@ -import os +import os, time from zope.interface import implements from twisted.python import failure from twisted.internet import defer @@ -38,6 +38,12 @@ class TooFullError(Exception): class UploadResults: implements(IUploadResults) + uri = None + sharemap = None # dict of shnum to placement string + servermap = None # dict of peerid to set(shnums) + def __init__(self): + self.timings = {} # dict of name to number of seconds + self.rates = {} # dict of name to rates (in bytes per second) # our current uri_extension is 846 bytes for small files, a few bytes # more for larger ones (since the filesize is encoded in decimal in a @@ -551,6 +557,7 @@ class CHKUploader: self._default_encoding_parameters = default_encoding_parameters self._log_number = self._client.log("CHKUploader starting") self._encoder = None + self._results = UploadResults() def log(self, *args, **kwargs): if "parent" not in kwargs: @@ -565,6 +572,7 @@ class CHKUploader: This method returns a Deferred that will fire with the URI (a string).""" + self._started = time.time() uploadable = IUploadable(uploadable) self.log("starting upload of %s" % uploadable) @@ -608,9 +616,14 @@ class CHKUploader: num_segments = encoder.get_param("num_segments") k,desired,n = encoder.get_param("share_counts") + self._peer_selection_started = time.time() d = peer_selector.get_shareholders(self._client, storage_index, share_size, block_size, num_segments, n, desired) + def _done(res): + self._peer_selection_finished = time.time() + return res + d.addCallback(_done) return d def set_shareholders(self, used_peers, encoder): @@ -618,11 +631,14 @@ class CHKUploader: @param used_peers: a sequence of PeerTracker objects """ self.log("_send_shares, used_peers is %s" % (used_peers,)) + self._sharemap = {} for peer in used_peers: assert isinstance(peer, PeerTracker) buckets = {} for peer in used_peers: buckets.update(peer.buckets) + for shnum in peer.buckets: + self._sharemap[shnum] = peer assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]) encoder.set_shareholders(buckets) @@ -635,9 +651,27 @@ class CHKUploader: total_shares=total_shares, size=size, ) - results = UploadResults() - results.uri = u.to_string() - return results + r = self._results + r.uri = u.to_string() + r.sharemap = {} + r.servermap = {} + for shnum in self._encoder.get_shares_placed(): + peer_tracker = self._sharemap[shnum] + peerid = peer_tracker.peerid + peerid_s = idlib.shortnodeid_b2a(peerid) + r.sharemap[shnum] = "Placed on [%s]" % peerid_s + if peerid not in r.servermap: + r.servermap[peerid] = set() + r.servermap[peerid].add(shnum) + peer_selection_time = (self._peer_selection_finished + - self._peer_selection_started) + now = time.time() + r.timings["total"] = now - self._started + r.rates["total"] = 1.0 * self._encoder.file_size / r.timings["total"] + r.timings["peer_selection"] = peer_selection_time + r.timings.update(self._encoder.get_times()) + r.rates.update(self._encoder.get_rates()) + return r def read_this_many_bytes(uploadable, size, prepend_data=[]): @@ -661,6 +695,7 @@ class LiteralUploader: def __init__(self, client): self._client = client + self._results = UploadResults() def set_params(self, encoding_parameters): pass @@ -675,9 +710,8 @@ class LiteralUploader: return d def _build_results(self, uri): - results = UploadResults() - results.uri = uri - return results + self._results.uri = uri + return self._results def close(self): pass @@ -760,6 +794,7 @@ class AssistedUploader: assert isinstance(default_encoding_parameters, dict) self._default_encoding_parameters = default_encoding_parameters self._log_number = log.msg("AssistedUploader starting") + self._results = UploadResults() def log(self, msg, parent=None, **kwargs): if parent is None: @@ -767,6 +802,7 @@ class AssistedUploader: return log.msg(msg, parent=parent, **kwargs) def start(self, uploadable): + self._started = time.time() u = IUploadable(uploadable) eu = EncryptAnUploadable(u, self._default_encoding_parameters) self._encuploadable = eu @@ -802,11 +838,16 @@ class AssistedUploader: self._storage_index = storage_index def _contact_helper(self, res): + now = self._time_contacting_helper = time.time() + self._results.timings["local_hashing"] = now - self._started self.log("contacting helper..") d = self._helper.callRemote("upload_chk", self._storage_index) d.addCallback(self._contacted_helper) return d def _contacted_helper(self, (upload_results, upload_helper)): + now = time.time() + elapsed = now - self._time_contacting_helper + self._results.timings["contacting_helper"] = elapsed if upload_helper: self.log("helper says we need to upload") # we need to upload the file @@ -849,9 +890,12 @@ class AssistedUploader: total_shares=self._total_shares, size=self._size, ) - results = UploadResults() - results.uri = u.to_string() - return results + r = self._results + r.uri = u.to_string() + now = time.time() + r.timings["total"] = now - self._started + r.rates["total"] = 1.0 * self._size / r.timings["total"] + return r class NoParameterPreferencesMixin: max_segment_size = None diff --git a/src/allmydata/web/unlinked-upload.xhtml b/src/allmydata/web/unlinked-upload.xhtml index 0de727b3..6f9149d3 100644 --- a/src/allmydata/web/unlinked-upload.xhtml +++ b/src/allmydata/web/unlinked-upload.xhtml @@ -14,6 +14,24 @@
Return to the Welcome Page
diff --git a/src/allmydata/webish.py b/src/allmydata/webish.py index e5e824ef..e5b8126a 100644 --- a/src/allmydata/webish.py +++ b/src/allmydata/webish.py @@ -1275,6 +1275,104 @@ class UnlinkedPOSTCHKUploader(rend.Page): ["/uri/" + res.uri]) return d + def render_sharemap(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.sharemap) + def _render(sharemap): + if sharemap is None: + return "None" + l = T.ul() + for shnum in sorted(sharemap.keys()): + l[T.li["%d -> %s" % (shnum, sharemap[shnum])]] + return l + d.addCallback(_render) + return d + + def render_servermap(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.servermap) + def _render(servermap): + if servermap is None: + return "None" + l = T.ul() + for peerid in sorted(servermap.keys()): + peerid_s = idlib.shortnodeid_b2a(peerid) + shares_s = ",".join([str(shnum) for shnum in servermap[peerid]]) + l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]] + return l + d.addCallback(_render) + return d + + def render_time(self, ctx, data): + # 1.23s, 790ms, 132us + if data is None: + return "" + s = float(data) + if s >= 1.0: + return "%.2fs" % s + if s >= 0.01: + return "%dms" % (1000*s) + if s >= 0.001: + return "%.1fms" % (1000*s) + return "%dus" % (1000000*s) + + def render_rate(self, ctx, data): + # 21.8kBps, 554.4kBps 4.37MBps + if data is None: + return "" + r = float(data) + if r > 1000000: + return "%1.2fMBps" % (r/1000000) + if r > 1000: + return "%.1fkBps" % (r/1000) + return "%dBps" % r + + def data_time_total(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.timings.get("total")) + return d + + def data_time_peer_selection(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.timings.get("peer_selection")) + return d + + def data_time_total_encode_and_push(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.timings.get("total_encode_and_push")) + return d + + def data_time_cumulative_encoding(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.timings.get("cumulative_encoding")) + return d + + def data_time_cumulative_sending(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.timings.get("cumulative_sending")) + return d + + def data_time_hashes_and_close(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.timings.get("hashes_and_close")) + return d + + def data_rate_total(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.rates.get("total")) + return d + + def data_rate_encode(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.rates.get("encode")) + return d + + def data_rate_push(self, ctx, data): + d = self.upload_results() + d.addCallback(lambda res: res.rates.get("push")) + return d + + class UnlinkedPOSTSSKUploader(rend.Page): def renderHTTP(self, ctx): req = inevow.IRequest(ctx)