# -*- test-case-name: allmydata.test.test_encode -*-
+import time
from zope.interface import implements
from twisted.internet import defer
from foolscap import eventual
# that we sent to that landlord.
self.share_root_hashes = [None] * self.num_shares
+ self._times = {
+ "cumulative_encoding": 0.0,
+ "cumulative_sending": 0.0,
+ "hashes_and_close": 0.0,
+ "total_encode_and_push": 0.0,
+ }
+ self._start_total_timestamp = time.time()
+
d = eventual.fireEventually()
d.addCallback(lambda res: self.start_all_shareholders())
def _encode_segment(self, segnum):
codec = self._codec
+ start = time.time()
# the ICodecEncoder API wants to receive a total of self.segment_size
# bytes on each encode() call, broken up into a number of
d = self._gather_data(self.required_shares, input_piece_size,
crypttext_segment_hasher)
- def _done(chunks):
+ def _done_gathering(chunks):
for c in chunks:
assert len(c) == input_piece_size
self._crypttext_hashes.append(crypttext_segment_hasher.digest())
# during this call, we hit 5*segsize memory
return codec.encode(chunks)
+ d.addCallback(_done_gathering)
+ def _done(res):
+ elapsed = time.time() - start
+ self._times["cumulative_encoding"] += elapsed
+ return res
d.addCallback(_done)
return d
def _encode_tail_segment(self, segnum):
+ start = time.time()
codec = self._tail_codec
input_piece_size = codec.get_block_size()
d = self._gather_data(self.required_shares, input_piece_size,
crypttext_segment_hasher,
allow_short=True)
- def _done(chunks):
+ def _done_gathering(chunks):
for c in chunks:
# a short trailing chunk will have been padded by
# _gather_data
assert len(c) == input_piece_size
self._crypttext_hashes.append(crypttext_segment_hasher.digest())
return codec.encode(chunks)
+ d.addCallback(_done_gathering)
+ def _done(res):
+ elapsed = time.time() - start
+ self._times["cumulative_encoding"] += elapsed
+ return res
d.addCallback(_done)
return d
# *doesn't* have a share, that's an error.
_assert(set(self.landlords.keys()).issubset(set(shareids)),
shareids=shareids, landlords=self.landlords)
+ start = time.time()
dl = []
lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
for i in range(len(shares)):
100 * (segnum+1) / self.num_segments,
),
level=log.OPERATIONAL)
+ elapsed = time.time() - start
+ self._times["cumulative_sending"] += elapsed
return res
dl.addCallback(_logit)
return dl
return d
def finish_hashing(self):
+ self._start_hashing_and_close_timestamp = time.time()
crypttext_hash = self._crypttext_hasher.digest()
self.uri_extension_data["crypttext_hash"] = crypttext_hash
d = self._uploadable.get_plaintext_hash()
def done(self):
self.log("upload done", level=log.OPERATIONAL)
+ now = time.time()
+ h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
+ self._times["hashes_and_close"] = h_and_c_elapsed
+ total_elapsed = now - self._start_total_timestamp
+ self._times["total_encode_and_push"] = total_elapsed
+
+ # update our sharemap
+ self._shares_placed = set(self.landlords.keys())
return (self.uri_extension_hash, self.required_shares,
self.num_shares, self.file_size)
return f
d.addCallback(_done)
return d
+
+ def get_shares_placed(self):
+ # return a set of share numbers that were successfully placed.
+ return self._shares_placed
+
+ def get_times(self):
+ # return a dictionary of encode+push timings
+ return self._times
+ def get_rates(self):
+ # return a dictionary of encode+push speeds
+ rates = {
+ "encode": self.file_size / self._times["cumulative_encoding"],
+ "push": self.file_size / self._times["cumulative_sending"],
+ }
+ return rates
-import os
+import os, time
from zope.interface import implements
from twisted.python import failure
from twisted.internet import defer
class UploadResults:
implements(IUploadResults)
+ uri = None
+ sharemap = None # dict of shnum to placement string
+ servermap = None # dict of peerid to set(shnums)
+ def __init__(self):
+ self.timings = {} # dict of name to number of seconds
+ self.rates = {} # dict of name to rates (in bytes per second)
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
self._default_encoding_parameters = default_encoding_parameters
self._log_number = self._client.log("CHKUploader starting")
self._encoder = None
+ self._results = UploadResults()
def log(self, *args, **kwargs):
if "parent" not in kwargs:
This method returns a Deferred that will fire with the URI (a
string)."""
+ self._started = time.time()
uploadable = IUploadable(uploadable)
self.log("starting upload of %s" % uploadable)
num_segments = encoder.get_param("num_segments")
k,desired,n = encoder.get_param("share_counts")
+ self._peer_selection_started = time.time()
d = peer_selector.get_shareholders(self._client, storage_index,
share_size, block_size,
num_segments, n, desired)
+ def _done(res):
+ self._peer_selection_finished = time.time()
+ return res
+ d.addCallback(_done)
return d
def set_shareholders(self, used_peers, encoder):
@param used_peers: a sequence of PeerTracker objects
"""
self.log("_send_shares, used_peers is %s" % (used_peers,))
+ self._sharemap = {}
for peer in used_peers:
assert isinstance(peer, PeerTracker)
buckets = {}
for peer in used_peers:
buckets.update(peer.buckets)
+ for shnum in peer.buckets:
+ self._sharemap[shnum] = peer
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
encoder.set_shareholders(buckets)
total_shares=total_shares,
size=size,
)
- results = UploadResults()
- results.uri = u.to_string()
- return results
+ r = self._results
+ r.uri = u.to_string()
+ r.sharemap = {}
+ r.servermap = {}
+ for shnum in self._encoder.get_shares_placed():
+ peer_tracker = self._sharemap[shnum]
+ peerid = peer_tracker.peerid
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ r.sharemap[shnum] = "Placed on [%s]" % peerid_s
+ if peerid not in r.servermap:
+ r.servermap[peerid] = set()
+ r.servermap[peerid].add(shnum)
+ peer_selection_time = (self._peer_selection_finished
+ - self._peer_selection_started)
+ now = time.time()
+ r.timings["total"] = now - self._started
+ r.rates["total"] = 1.0 * self._encoder.file_size / r.timings["total"]
+ r.timings["peer_selection"] = peer_selection_time
+ r.timings.update(self._encoder.get_times())
+ r.rates.update(self._encoder.get_rates())
+ return r
def read_this_many_bytes(uploadable, size, prepend_data=[]):
def __init__(self, client):
self._client = client
+ self._results = UploadResults()
def set_params(self, encoding_parameters):
pass
return d
def _build_results(self, uri):
- results = UploadResults()
- results.uri = uri
- return results
+ self._results.uri = uri
+ return self._results
def close(self):
pass
assert isinstance(default_encoding_parameters, dict)
self._default_encoding_parameters = default_encoding_parameters
self._log_number = log.msg("AssistedUploader starting")
+ self._results = UploadResults()
def log(self, msg, parent=None, **kwargs):
if parent is None:
return log.msg(msg, parent=parent, **kwargs)
def start(self, uploadable):
+ self._started = time.time()
u = IUploadable(uploadable)
eu = EncryptAnUploadable(u, self._default_encoding_parameters)
self._encuploadable = eu
self._storage_index = storage_index
def _contact_helper(self, res):
+ now = self._time_contacting_helper = time.time()
+ self._results.timings["local_hashing"] = now - self._started
self.log("contacting helper..")
d = self._helper.callRemote("upload_chk", self._storage_index)
d.addCallback(self._contacted_helper)
return d
def _contacted_helper(self, (upload_results, upload_helper)):
+ now = time.time()
+ elapsed = now - self._time_contacting_helper
+ self._results.timings["contacting_helper"] = elapsed
if upload_helper:
self.log("helper says we need to upload")
# we need to upload the file
total_shares=self._total_shares,
size=self._size,
)
- results = UploadResults()
- results.uri = u.to_string()
- return results
+ r = self._results
+ r.uri = u.to_string()
+ now = time.time()
+ r.timings["total"] = now - self._started
+ r.rates["total"] = 1.0 * self._size / r.timings["total"]
+ return r
class NoParameterPreferencesMixin:
max_segment_size = None
<ul>
<li>URI: <tt><span n:render="string" n:data="uri" /></tt></li>
<li>Download link: <span n:render="download_link" /></li>
+ <li>Sharemap: <span n:render="sharemap" /></li>
+ <li>Servermap: <span n:render="servermap" /></li>
+ <li>Timings:</li>
+ <ul>
+ <li>Total: <span n:render="time" n:data="time_total" />
+ (<span n:render="rate" n:data="rate_total" />)</li>
+ <ul>
+ <li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
+ <li>Encode And Push: <span n:render="time" n:data="time_total_encode_and_push" /></li>
+ <ul>
+ <li>Cumulative Encoding: <span n:render="time" n:data="time_cumulative_encoding" />
+ (<span n:render="rate" n:data="rate_encode" />)</li>
+ <li>Cumulative Pushing: <span n:render="time" n:data="time_cumulative_sending" />
+ (<span n:render="rate" n:data="rate_push" />)</li>
+ <li>Send Hashes And Close: <span n:render="time" n:data="time_hashes_and_close" /></li>
+ </ul>
+ </ul>
+ </ul>
</ul>
<div>Return to the <a href="/">Welcome Page</a></div>
["/uri/" + res.uri])
return d
+ def render_sharemap(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.sharemap)
+ def _render(sharemap):
+ if sharemap is None:
+ return "None"
+ l = T.ul()
+ for shnum in sorted(sharemap.keys()):
+ l[T.li["%d -> %s" % (shnum, sharemap[shnum])]]
+ return l
+ d.addCallback(_render)
+ return d
+
+ def render_servermap(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.servermap)
+ def _render(servermap):
+ if servermap is None:
+ return "None"
+ l = T.ul()
+ for peerid in sorted(servermap.keys()):
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
+ l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
+ return l
+ d.addCallback(_render)
+ return d
+
+ def render_time(self, ctx, data):
+ # 1.23s, 790ms, 132us
+ if data is None:
+ return ""
+ s = float(data)
+ if s >= 1.0:
+ return "%.2fs" % s
+ if s >= 0.01:
+ return "%dms" % (1000*s)
+ if s >= 0.001:
+ return "%.1fms" % (1000*s)
+ return "%dus" % (1000000*s)
+
+ def render_rate(self, ctx, data):
+ # 21.8kBps, 554.4kBps 4.37MBps
+ if data is None:
+ return ""
+ r = float(data)
+ if r > 1000000:
+ return "%1.2fMBps" % (r/1000000)
+ if r > 1000:
+ return "%.1fkBps" % (r/1000)
+ return "%dBps" % r
+
+ def data_time_total(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.timings.get("total"))
+ return d
+
+ def data_time_peer_selection(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.timings.get("peer_selection"))
+ return d
+
+ def data_time_total_encode_and_push(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.timings.get("total_encode_and_push"))
+ return d
+
+ def data_time_cumulative_encoding(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.timings.get("cumulative_encoding"))
+ return d
+
+ def data_time_cumulative_sending(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.timings.get("cumulative_sending"))
+ return d
+
+ def data_time_hashes_and_close(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.timings.get("hashes_and_close"))
+ return d
+
+ def data_rate_total(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.rates.get("total"))
+ return d
+
+ def data_rate_encode(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.rates.get("encode"))
+ return d
+
+ def data_rate_push(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.rates.get("push"))
+ return d
+
+
class UnlinkedPOSTSSKUploader(rend.Page):
def renderHTTP(self, ctx):
req = inevow.IRequest(ctx)