statusid_counter = count(0)
def __init__(self):
self.timings = {}
- self.sharemap = None
+ self.timings["fetch_per_server"] = {}
+ self.sharemap = {}
+ self.problems = {}
self.active = True
self.storage_index = None
self.helper = False
+ self.encoding = ("?","?")
+ self.search_distance = None
self.size = None
self.status = "Not started"
self.progress = 0.0
return self.started
def get_storage_index(self):
return self.storage_index
+ def get_encoding(self):
+ return self.encoding
+ def get_search_distance(self):
+ return self.search_distance
def using_helper(self):
return self.helper
def get_size(self):
self.storage_index = si
def set_helper(self, helper):
self.helper = helper
+ def set_encoding(self, k, n):
+ self.encoding = (k, n)
+ def set_search_distance(self, value):
+ self.search_distance = value
def set_size(self, size):
self.size = size
def set_status(self, status):
# the hashes over and over again.
self._valid_shares = {}
+ self._started = time.time()
self._done_deferred = defer.Deferred()
d = defer.succeed(initial_query_count)
def _choose_initial_peers(self, numqueries):
n = self._node
+ started = time.time()
full_peerlist = n._client.get_permuted_peers("storage",
self._storage_index)
# we later increase this limit, it may be useful to re-scan the
# permuted list.
self._peerlist_limit = numqueries
+ self._status.set_search_distance(len(self._peerlist))
+ elapsed = time.time() - started
+ self._status.timings["peer_selection"] = elapsed
return self._peerlist
def _send_initial_requests(self, peerlist):
+ self._first_query_sent = time.time()
self._bad_peerids = set()
self._running = True
self._queries_outstanding = set()
return None
def _do_query(self, ss, peerid, storage_index, readsize):
+ started = time.time()
self._queries_outstanding.add(peerid)
d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)])
- d.addCallback(self._got_results, peerid, readsize, (ss, storage_index))
+ d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
+ started)
d.addErrback(self._query_failed, peerid)
# errors that aren't handled by _query_failed (and errors caused by
# _query_failed) get logged, but we still want to check for doneness.
verifier = rsa.create_verifying_key_from_string(pubkey_s)
return verifier
- def _got_results(self, datavs, peerid, readsize, stuff):
+ def _got_results(self, datavs, peerid, readsize, stuff, started):
+ elapsed = time.time() - started
+ if peerid not in self._status.timings["fetch_per_server"]:
+ self._status.timings["fetch_per_server"][peerid] = []
+ self._status.timings["fetch_per_server"][peerid].append(elapsed)
self._queries_outstanding.discard(peerid)
self._used_peers.add(peerid)
if not self._running:
return
+ if peerid not in self._status.sharemap:
+ self._status.sharemap[peerid] = set()
+
for shnum,datav in datavs.items():
data = datav[0]
try:
fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
assert len(fingerprint) == 32
if fingerprint != self._node._fingerprint:
+ self._status.problems[peerid] = "sh#%d: pubkey doesn't match fingerprint" % shnum
raise CorruptShareError(peerid, shnum,
"pubkey doesn't match fingerprint")
self._pubkey = self._deserialize_pubkey(pubkey_s)
self._node._populate_pubkey(self._pubkey)
verinfo = (seqnum, root_hash, IV, segsize, datalength)
+ self._status.sharemap[peerid].add(verinfo)
+
if verinfo not in self._valid_versions:
# it's a new pair. Verify the signature.
valid = self._pubkey.verify(prefix, signature)
if not valid:
+ self._status.problems[peerid] = "sh#%d: invalid signature" % shnum
raise CorruptShareError(peerid, shnum,
"signature is invalid")
# ok, it's a valid verinfo. Add it to the list of validated
# rest of the shares), we need to implement the refactoring mentioned
# above.
if k != self._required_shares:
+ self._status.problems[peerid] = "sh#%d: k=%d, we want %d" \
+ % (shnum, k, self._required_shares)
raise CorruptShareError(peerid, shnum,
"share has k=%d, we want k=%d" %
(k, self._required_shares))
if N != self._total_shares:
+ self._status.problems[peerid] = "sh#%d: N=%d, we want %d" \
+ % (shnum, N, self._total_shares)
raise CorruptShareError(peerid, shnum,
"share has N=%d, we want N=%d" %
(N, self._total_shares))
level=log.UNUSUAL)
# are there any peers on the list that we haven't used?
new_query_peers = []
- for (peerid, ss) in self._peerlist:
+ peer_indicies = []
+ for i, (peerid, ss) in enumerate(self._peerlist):
if peerid not in self._used_peers:
new_query_peers.append( (peerid, ss) )
+ peer_indicies.append(i)
if len(new_query_peers) > 5:
# only query in batches of 5. TODO: this is pretty
# arbitrary, really I want this to be something like
# k - max(known_version_sharecounts) + some extra
break
+ new_search_distance = max(max(peer_indicies),
+ self._status.get_search_distance())
+ self._status.set_search_distance(new_search_distance)
if new_query_peers:
self.log("sending %d new queries (read %d bytes)" %
(len(new_query_peers), self._read_size), level=log.UNUSUAL)
# now that the big loop is done, all shares in the sharemap are
# valid, and they're all for the same seqnum+root_hash version, so
# it's now down to doing FEC and decrypt.
+ elapsed = time.time() - self._started
+ self._status.timings["fetch"] = elapsed
assert len(shares) >= self._required_shares, len(shares)
d = defer.maybeDeferred(self._decode, shares, segsize, datalength)
d.addCallback(self._decrypt, IV, seqnum, root_hash)
self.log("params %s, we have %d shares" % (params, len(shares)))
self.log("about to decode, shareids=%s" % (shareids,))
+ started = time.time()
d = defer.maybeDeferred(fec.decode, shares, shareids)
def _done(buffers):
+ elapsed = time.time() - started
+ self._status.timings["decode"] = elapsed
+ self._status.set_encoding(self._required_shares, self._total_shares)
self.log(" decode done, %d buffers" % len(buffers))
segment = "".join(buffers)
self.log(" joined length %d, datalength %d" %
return d
def _decrypt(self, crypttext, IV, seqnum, root_hash):
+ started = time.time()
key = hashutil.ssk_readkey_data_hash(IV, self._readkey)
decryptor = AES(key)
plaintext = decryptor.process(crypttext)
+ elapsed = time.time() - started
+ self._status.timings["decrypt"] = elapsed
# it worked, so record the seqnum and root_hash for next time
self._node._populate_seqnum(seqnum)
self._node._populate_root_hash(root_hash)
self._status.set_status("Done")
self._status.set_progress(1.0)
self._status.set_size(len(contents))
+ elapsed = time.time() - self._started
+ self._status.timings["total"] = elapsed
eventually(self._done_deferred.callback, contents)
def get_status(self):
# XXX add redirect_to_result
return d
-def plural(sequence):
- if len(sequence) == 1:
+def plural(sequence_or_length):
+ if isinstance(sequence_or_length, int):
+ length = sequence_or_length
+ else:
+ length = len(sequence_or_length)
+ if length == 1:
return ""
return "s"
-class UploadResultsRendererMixin:
+class RateAndTimeMixin:
+
+ def render_time(self, ctx, data):
+ # 1.23s, 790ms, 132us
+ if data is None:
+ return ""
+ s = float(data)
+ if s >= 1.0:
+ return "%.2fs" % s
+ if s >= 0.01:
+ return "%dms" % (1000*s)
+ if s >= 0.001:
+ return "%.1fms" % (1000*s)
+ return "%dus" % (1000000*s)
+
+ def render_rate(self, ctx, data):
+ # 21.8kBps, 554.4kBps 4.37MBps
+ if data is None:
+ return ""
+ r = float(data)
+ if r > 1000000:
+ return "%1.2fMBps" % (r/1000000)
+ if r > 1000:
+ return "%.1fkBps" % (r/1000)
+ return "%dBps" % r
+
+class UploadResultsRendererMixin(RateAndTimeMixin):
# this requires a method named 'upload_results'
def render_sharemap(self, ctx, data):
d.addCallback(lambda res: res.file_size)
return d
- def render_time(self, ctx, data):
- # 1.23s, 790ms, 132us
- if data is None:
- return ""
- s = float(data)
- if s >= 1.0:
- return "%.2fs" % s
- if s >= 0.01:
- return "%dms" % (1000*s)
- if s >= 0.001:
- return "%.1fms" % (1000*s)
- return "%dus" % (1000000*s)
-
- def render_rate(self, ctx, data):
- # 21.8kBps, 554.4kBps 4.37MBps
- if data is None:
- return ""
- r = float(data)
- if r > 1000000:
- return "%1.2fMBps" % (r/1000000)
- if r > 1000:
- return "%.1fkBps" % (r/1000)
- return "%dBps" % r
-
def _get_time(self, name):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get(name))
def render_status(self, ctx, data):
return data.get_status()
-class DownloadResultsRendererMixin:
+class DownloadResultsRendererMixin(RateAndTimeMixin):
# this requires a method named 'download_results'
def render_servermap(self, ctx, data):
d.addCallback(lambda res: res.file_size)
return d
- def render_time(self, ctx, data):
- # 1.23s, 790ms, 132us
- if data is None:
- return ""
- s = float(data)
- if s >= 1.0:
- return "%.2fs" % s
- if s >= 0.01:
- return "%dms" % (1000*s)
- if s >= 0.001:
- return "%.1fms" % (1000*s)
- return "%dus" % (1000000*s)
-
- def render_rate(self, ctx, data):
- # 21.8kBps, 554.4kBps 4.37MBps
- if data is None:
- return ""
- r = float(data)
- if r > 1000000:
- return "%1.2fMBps" % (r/1000000)
- if r > 1000:
- return "%.1fkBps" % (r/1000)
- return "%dBps" % r
-
def _get_time(self, name):
d = self.download_results()
d.addCallback(lambda res: res.timings.get(name))
def render_status(self, ctx, data):
return data.get_status()
-class RetrieveStatusPage(rend.Page):
+class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
docFactory = getxmlfile("retrieve-status.xhtml")
+ def __init__(self, data):
+ rend.Page.__init__(self, data)
+ self.retrieve_status = data
+
def render_started(self, ctx, data):
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
started_s = time.strftime(TIME_FORMAT,
def render_status(self, ctx, data):
return data.get_status()
+ def render_encoding(self, ctx, data):
+ k, n = data.get_encoding()
+ return ctx.tag["Encoding: %s of %s" % (k, n)]
+
+ def render_search_distance(self, ctx, data):
+ d = data.get_search_distance()
+ return ctx.tag["Search Distance: %s peer%s" % (d, plural(d))]
+
+ def render_problems(self, ctx, data):
+ problems = data.problems
+ if not problems:
+ return ""
+ l = T.ul()
+ for peerid in sorted(problems.keys()):
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]]
+ return ctx.tag["Server Problems:", l]
+
+ def _get_rate(self, data, name):
+ file_size = self.retrieve_status.get_size()
+ time = self.retrieve_status.timings.get(name)
+ if time is None:
+ return None
+ try:
+ return 1.0 * file_size / time
+ except ZeroDivisionError:
+ return None
+
+ def data_time_total(self, ctx, data):
+ return self.retrieve_status.timings.get("total")
+ def data_rate_total(self, ctx, data):
+ return self._get_rate(data, "total")
+
+ def data_time_peer_selection(self, ctx, data):
+ return self.retrieve_status.timings.get("peer_selection")
+
+ def data_time_fetch(self, ctx, data):
+ return self.retrieve_status.timings.get("fetch")
+ def data_rate_fetch(self, ctx, data):
+ return self._get_rate(data, "fetch")
+
+ def data_time_decode(self, ctx, data):
+ return self.retrieve_status.timings.get("decode")
+ def data_rate_decode(self, ctx, data):
+ return self._get_rate(data, "decode")
+
+ def data_time_decrypt(self, ctx, data):
+ return self.retrieve_status.timings.get("decrypt")
+ def data_rate_decrypt(self, ctx, data):
+ return self._get_rate(data, "decrypt")
+
+ def render_server_timings(self, ctx, data):
+ per_server = self.retrieve_status.timings.get("fetch_per_server")
+ if not per_server:
+ return ""
+ l = T.ul()
+ for peerid in sorted(per_server.keys()):
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ times_s = ", ".join([self.render_time(None, t)
+ for t in per_server[peerid]])
+ l[T.li["[%s]: %s" % (peerid_s, times_s)]]
+ return T.li["Per-Server Fetch Response Times: ", l]
+
+
class PublishStatusPage(rend.Page):
docFactory = getxmlfile("publish-status.xhtml")
def data_recent_operations(self, ctx, data):
recent = [o for o in (IClient(ctx).list_recent_uploads() +
- IClient(ctx).list_recent_downloads() +
- IClient(ctx).list_recent_publish() +
- IClient(ctx).list_recent_retrieve())
+ IClient(ctx).list_recent_downloads() +
+ IClient(ctx).list_recent_publish() +
+ IClient(ctx).list_recent_retrieve())
if not o.get_active()]
recent.sort(lambda a,b: cmp(a.get_started(), b.get_started()))
recent.reverse()