def __init__(self):
self.timings = {}
self.timings["fetch_per_server"] = {}
+ self.timings["cumulative_verify"] = 0.0
self.sharemap = {}
self.problems = {}
self.active = True
if verinfo not in self._valid_versions:
# it's a new pair. Verify the signature.
+ started = time.time()
valid = self._pubkey.verify(prefix, signature)
+ # this records the total verification time for all versions we've
+ # seen. This time is included in "fetch".
+ elapsed = time.time() - started
+ self._status.timings["cumulative_verify"] += elapsed
+
if not valid:
self._status.problems[peerid] = "sh#%d: invalid signature" % shnum
raise CorruptShareError(peerid, shnum,
"signature is invalid")
+
# ok, it's a valid verinfo. Add it to the list of validated
# versions.
self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
statusid_counter = count(0)
def __init__(self):
self.timings = {}
- self.sharemap = None
+ self.timings["per_server"] = {}
+ self.privkey_from = None
+ self.peers_queried = None
+ self.sharemap = None # DictOfSets
+ self.problems = {}
self.active = True
self.storage_index = None
self.helper = False
+ self.encoding = ("?", "?")
+ self.initial_read_size = None
self.size = None
self.status = "Not started"
self.progress = 0.0
self.counter = self.statusid_counter.next()
self.started = time.time()
+ def add_per_server_time(self, peerid, op, elapsed):
+ assert op in ("read", "write")
+ if peerid not in self.timings["per_server"]:
+ self.timings["per_server"][peerid] = []
+ self.timings["per_server"][peerid].append((op,elapsed))
+
def get_started(self):
return self.started
def get_storage_index(self):
return self.storage_index
+ def get_encoding(self):
+ return self.encoding
def using_helper(self):
return self.helper
def get_size(self):
self.storage_index = si
def set_helper(self, helper):
self.helper = helper
+ def set_encoding(self, k, n):
+ self.encoding = (k, n)
def set_size(self, size):
self.size = size
def set_status(self, status):
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
+ self._started = time.time()
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
# 5: when enough responses are back, we're done
self.log("starting publish, datalen is %s" % len(newdata))
- self._started = time.time()
self._status.set_size(len(newdata))
self._writekey = self._node.get_writekey()
required_shares = self._node.get_required_shares()
total_shares = self._node.get_total_shares()
self._pubkey = self._node.get_pubkey()
+ self._status.set_encoding(required_shares, total_shares)
# these two may not be, we might have to get them from the first peer
self._privkey = self._node.get_privkey()
# with up to 7 entries, allowing us to make an update in 2 RTT
# instead of 3.
self._read_size = 1000
+ self._status.initial_read_size = self._read_size
d = defer.succeed(total_shares)
d.addCallback(self._query_peers)
+ d.addCallback(self._query_peers_done)
d.addCallback(self._obtain_privkey)
+ d.addCallback(self._obtain_privkey_done)
d.addCallback(self._encrypt_and_encode, newdata, readkey, IV,
required_shares, total_shares)
def _query_peers(self, total_shares):
self.log("_query_peers")
+ self._query_peers_started = now = time.time()
+ elapsed = now - self._started
+ self._status.timings["setup"] = elapsed
storage_index = self._storage_index
EPSILON = total_shares / 2
#partial_peerlist = islice(peerlist, total_shares + EPSILON)
partial_peerlist = peerlist[:total_shares+EPSILON]
+ self._status.peers_queried = len(partial_peerlist)
self._storage_servers = {}
+ started = time.time()
dl = []
for permutedid, (peerid, ss) in enumerate(partial_peerlist):
self._storage_servers[peerid] = ss
d = self._do_query(ss, peerid, storage_index)
d.addCallback(self._got_query_results,
peerid, permutedid,
- reachable_peers, current_share_peers)
+ reachable_peers, current_share_peers, started)
dl.append(d)
d = defer.DeferredList(dl)
d.addCallback(self._got_all_query_results,
return d
def _got_query_results(self, datavs, peerid, permutedid,
- reachable_peers, current_share_peers):
+ reachable_peers, current_share_peers, started):
lp = self.log(format="_got_query_results from %(peerid)s",
peerid=idlib.shortnodeid_b2a(peerid))
+ elapsed = time.time() - started
+ self._status.add_per_server_time(peerid, "read", elapsed)
+
assert isinstance(datavs, dict)
reachable_peers[peerid] = permutedid
if not datavs:
total_shares, reachable_peers,
current_share_peers):
self.log("_got_all_query_results")
+
# now that we know everything about the shares currently out there,
# decide where to place the new shares.
target_info = (target_map, shares_per_peer)
return target_info
+ def _query_peers_done(self, target_info):
+ self._obtain_privkey_started = now = time.time()
+ elapsed = time.time() - self._query_peers_started
+ self._status.timings["query"] = elapsed
+ return target_info
+
def _obtain_privkey(self, target_info):
# make sure we've got a copy of our private key.
if self._privkey:
# Must have picked it up during _query_peers. We're good to go.
+ if "privkey_fetch" not in self._status.timings:
+ self._status.timings["privkey_fetch"] = 0.0
return target_info
# Nope, we haven't managed to grab a copy, and we still need it. Ask
# peers one at a time until we get a copy. Only bother asking peers
# who've admitted to holding a share.
+ self._privkey_fetch_started = time.time()
target_map, shares_per_peer = target_info
# pull shares from self._encprivkey_shares
if not self._encprivkey_shares:
return d
def _do_privkey_query(self, rref, peerid, shnum, offset, length):
+ started = time.time()
d = rref.callRemote("slot_readv", self._storage_index,
[shnum], [(offset, length)] )
- d.addCallback(self._privkey_query_response, peerid, shnum)
+ d.addCallback(self._privkey_query_response, peerid, shnum, started)
return d
- def _privkey_query_response(self, datav, peerid, shnum):
+ def _privkey_query_response(self, datav, peerid, shnum, started):
+ elapsed = time.time() - started
+ self._status.add_per_server_time(peerid, "read", elapsed)
+
data = datav[shnum][0]
self._try_to_validate_privkey(data, peerid, shnum)
+ elapsed = time.time() - self._privkey_fetch_started
+ self._status.timings["privkey_fetch"] = elapsed
+ self._status.privkey_from = peerid
+
+ def _obtain_privkey_done(self, target_info):
+ elapsed = time.time() - self._obtain_privkey_started
+ self._status.timings["privkey"] = elapsed
+ return target_info
+
def _encrypt_and_encode(self, target_info,
newdata, readkey, IV,
required_shares, total_shares):
self.log("_encrypt_and_encode")
+ started = time.time()
+
key = hashutil.ssk_readkey_data_hash(IV, readkey)
enc = AES(key)
crypttext = enc.process(newdata)
assert len(crypttext) == len(newdata)
+ now = time.time()
+ self._status.timings["encrypt"] = now - started
+ started = now
+
# now apply FEC
self.MAX_SEGMENT_SIZE = 1024*1024
data_length = len(crypttext)
assert len(piece) == piece_size
d = fec.encode(crypttext_pieces)
+ def _done_encoding(res):
+ elapsed = time.time() - started
+ self._status.timings["encode"] = elapsed
+ return res
+ d.addCallback(_done_encoding)
d.addCallback(lambda shares_and_shareids:
(shares_and_shareids,
required_shares, total_shares,
target_info),
seqnum, IV):
self.log("_generate_shares")
+ started = time.time()
# we should know these by now
privkey = self._privkey
# then they all share the same encprivkey at the end. The sizes
# of everything are the same for all shares.
+ sign_started = time.time()
signature = privkey.sign(prefix)
+ self._status.timings["sign"] = time.time() - sign_started
verification_key = pubkey.serialize()
all_shares[shnum],
encprivkey)
final_shares[shnum] = final_share
+ elapsed = time.time() - started
+ self._status.timings["pack"] = elapsed
return (seqnum, root_hash, final_shares, target_info)
def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV):
self.log("_send_shares")
+ started = time.time()
+
# we're finally ready to send out our shares. If we encounter any
# surprises here, it's because somebody else is writing at the same
# time. (Note: in the future, when we remove the _query_peers() step
d = self._do_testreadwrite(peerid, secrets,
tw_vectors, read_vector)
d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
- peerid, expected_old_shares[peerid], dispatch_map)
+ peerid, expected_old_shares[peerid], dispatch_map,
+ started)
dl.append(d)
d = defer.DeferredList(dl)
+ def _done_sending(res):
+ elapsed = time.time() - started
+ self._status.timings["push"] = elapsed
+ self._status.sharemap = dispatch_map
+ return res
+ d.addCallback(_done_sending)
d.addCallback(lambda res: (self._surprised, dispatch_map))
return d
def _got_write_answer(self, answer, tw_vectors, my_checkstring,
peerid, expected_old_shares,
- dispatch_map):
+ dispatch_map, started):
lp = self.log("_got_write_answer from %s" %
idlib.shortnodeid_b2a(peerid))
+ elapsed = time.time() - started
+ self._status.add_per_server_time(peerid, "write", elapsed)
+
wrote, read_data = answer
surprised = False
def data_rate_fetch(self, ctx, data):
return self._get_rate(data, "fetch")
+ def data_time_cumulative_verify(self, ctx, data):
+ return self.retrieve_status.timings.get("cumulative_verify")
+
def data_time_decode(self, ctx, data):
return self.retrieve_status.timings.get("decode")
def data_rate_decode(self, ctx, data):
return T.li["Per-Server Fetch Response Times: ", l]
-class PublishStatusPage(rend.Page):
+class PublishStatusPage(rend.Page, RateAndTimeMixin):
docFactory = getxmlfile("publish-status.xhtml")
+ def __init__(self, data):
+ rend.Page.__init__(self, data)
+ self.publish_status = data
+
def render_started(self, ctx, data):
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
started_s = time.strftime(TIME_FORMAT,
def render_status(self, ctx, data):
return data.get_status()
+ def render_encoding(self, ctx, data):
+ k, n = data.get_encoding()
+ return ctx.tag["Encoding: %s of %s" % (k, n)]
+
+ def render_peers_queried(self, ctx, data):
+ return ctx.tag["Peers Queried: ", data.peers_queried]
+
+ def render_sharemap(self, ctx, data):
+ sharemap = data.sharemap
+ if sharemap is None:
+ return ctx.tag["None"]
+ l = T.ul()
+ for shnum in sorted(sharemap.keys()):
+ l[T.li["%d -> Placed on " % shnum,
+ ", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
+ for (peerid,seqnum,root_hash)
+ in sharemap[shnum]])]]
+ return ctx.tag["Sharemap:", l]
+
+ def render_problems(self, ctx, data):
+ problems = data.problems
+ if not problems:
+ return ""
+ l = T.ul()
+ for peerid in sorted(problems.keys()):
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]]
+ return ctx.tag["Server Problems:", l]
+
+ def _get_rate(self, data, name):
+ file_size = self.publish_status.get_size()
+ time = self.publish_status.timings.get(name)
+ if time is None:
+ return None
+ try:
+ return 1.0 * file_size / time
+ except ZeroDivisionError:
+ return None
+
+ def data_time_total(self, ctx, data):
+ return self.publish_status.timings.get("total")
+ def data_rate_total(self, ctx, data):
+ return self._get_rate(data, "total")
+
+ def data_time_setup(self, ctx, data):
+ return self.publish_status.timings.get("setup")
+
+ def data_time_query(self, ctx, data):
+ return self.publish_status.timings.get("query")
+
+ def data_time_privkey(self, ctx, data):
+ return self.publish_status.timings.get("privkey")
+
+ def data_time_privkey_fetch(self, ctx, data):
+ return self.publish_status.timings.get("privkey_fetch")
+ def render_privkey_from(self, ctx, data):
+ peerid = data.privkey_from
+ if peerid:
+ return " (got from [%s])" % idlib.shortnodeid_b2a(peerid)
+ else:
+ return ""
+
+ def data_time_encrypt(self, ctx, data):
+ return self.publish_status.timings.get("encrypt")
+ def data_rate_encrypt(self, ctx, data):
+ return self._get_rate(data, "encrypt")
+
+ def data_time_encode(self, ctx, data):
+ return self.publish_status.timings.get("encode")
+ def data_rate_encode(self, ctx, data):
+ return self._get_rate(data, "encode")
+
+ def data_time_pack(self, ctx, data):
+ return self.publish_status.timings.get("pack")
+ def data_rate_pack(self, ctx, data):
+ return self._get_rate(data, "pack")
+ def data_time_sign(self, ctx, data):
+ return self.publish_status.timings.get("sign")
+
+ def data_time_push(self, ctx, data):
+ return self.publish_status.timings.get("push")
+ def data_rate_push(self, ctx, data):
+ return self._get_rate(data, "push")
+
+ def data_initial_read_size(self, ctx, data):
+ return self.publish_status.initial_read_size
+
+ def render_server_timings(self, ctx, data):
+ per_server = self.publish_status.timings.get("per_server")
+ if not per_server:
+ return ""
+ l = T.ul()
+ for peerid in sorted(per_server.keys()):
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ times = []
+ for op,t in per_server[peerid]:
+ if op == "read":
+ times.append( "(" + self.render_time(None, t) + ")" )
+ else:
+ times.append( self.render_time(None, t) )
+ times_s = ", ".join(times)
+ l[T.li["[%s]: %s" % (peerid_s, times_s)]]
+ return T.li["Per-Server Response Times: ", l]
+
+
class Status(rend.Page):
docFactory = getxmlfile("status.xhtml")
addSlash = True