import sys, time
+from zope.interface import implements
+from itertools import count
from twisted.internet import defer
from twisted.python import failure
from foolscap.eventual import eventually
from allmydata.util import base32, hashutil, idlib, log
from allmydata import storage
+from allmydata.interfaces import IServermapUpdaterStatus
from pycryptopp.publickey import rsa
from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
DictOfSets, CorruptShareError, NeedMoreDataError
from layout import unpack_prefix_and_signature, unpack_header, unpack_share
+class UpdateStatus:
+ implements(IServermapUpdaterStatus)
+ statusid_counter = count(0)
+ def __init__(self):
+ self.timings = {}
+ self.timings["per_server"] = {}
+ self.timings["cumulative_verify"] = 0.0
+ self.privkey_from = None
+ self.problems = {}
+ self.active = True
+ self.storage_index = None
+ self.mode = "?"
+ self.status = "Not started"
+ self.progress = 0.0
+ self.counter = self.statusid_counter.next()
+ self.started = time.time()
+
+ def add_per_server_time(self, peerid, op, elapsed):
+ assert op in ("query", "privkey")
+ if peerid not in self.timings["per_server"]:
+ self.timings["per_server"][peerid] = []
+ self.timings["per_server"][peerid].append((op,elapsed))
+
+ def get_started(self):
+ return self.started
+ def get_storage_index(self):
+ return self.storage_index
+ def get_mode(self):
+ return self.mode
+ def get_servermap(self):
+ return self.servermap
+ def get_privkey_from(self):
+ return self.privkey_from
+ def using_helper(self):
+ return False
+ def get_size(self):
+ return "-NA-"
+ def get_status(self):
+ return self.status
+ def get_progress(self):
+ return self.progress
+ def get_active(self):
+ return self.active
+ def get_counter(self):
+ return self.counter
+
+ def set_storage_index(self, si):
+ self.storage_index = si
+ def set_mode(self, mode):
+ self.mode = mode
+ def set_privkey_from(self, peerid):
+ self.privkey_from = peerid
+ def set_status(self, status):
+ self.status = status
+ def set_progress(self, value):
+ self.progress = value
+ def set_active(self, value):
+ self.active = value
+
class ServerMap:
"""I record the placement of mutable shares.
self._storage_index = filenode.get_storage_index()
self._last_failure = None
+ self._status = UpdateStatus()
+ self._status.set_storage_index(self._storage_index)
+ self._status.set_progress(0.0)
+ self._status.set_mode(mode)
+
# how much data should we read?
# * if we only need the checkstring, then [0:75]
# * if we need to validate the checkstring sig, then [543ish:799ish]
self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
si=prefix, mode=mode)
+ def get_status(self):
+ return self._status
+
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
def update(self):
"""Update the servermap to reflect current conditions. Returns a
Deferred that fires with the servermap once the update has finished."""
+ self._started = time.time()
+ self._status.set_active(True)
# self._valid_versions is a set of validated verinfo tuples. We just
# use it to remember which versions had valid signatures, so we can
# be retrievable, and to make the eventual data download faster.
self.versionmap = DictOfSets()
- self._started = time.time()
self._done_deferred = defer.Deferred()
# first, which peers should be talk to? Any that were in our old
# enough responses)
self._send_initial_requests(initial_peers_to_query)
+ self._status.timings["setup"] = time.time() - self._started
return self._done_deferred
def _build_initial_querylist(self):
return initial_peers_to_query, must_query
def _send_initial_requests(self, peerlist):
+ self._status.set_status("Sending %d initial queries" % len(peerlist))
self._queries_outstanding = set()
self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
dl = []
peerid=idlib.shortnodeid_b2a(peerid),
numshares=len(datavs),
level=log.NOISY)
+ now = time.time()
+ elapsed = now - started
+ self._status.add_per_server_time(peerid, "query", elapsed)
self._queries_outstanding.discard(peerid)
self._must_query.discard(peerid)
self._queries_completed += 1
self._servermap.problems.append(f)
pass
+ self._status.timings["cumulative_verify"] += (time.time() - now)
+
if self._need_privkey and last_verinfo:
# send them a request for the privkey. We send one request per
# server.
self._queries_outstanding.add(peerid)
readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
ss = self._servermap.connections[peerid]
+ privkey_started = time.time()
d = self._do_read(ss, peerid, self._storage_index,
[last_shnum], readv)
- d.addCallback(self._got_privkey_results, peerid, last_shnum)
+ d.addCallback(self._got_privkey_results, peerid, last_shnum,
+ privkey_started)
d.addErrback(self._privkey_query_failed, peerid, last_shnum)
d.addErrback(log.err)
d.addCallback(self._check_for_done)
self._node._populate_encprivkey(enc_privkey)
self._node._populate_privkey(privkey)
self._need_privkey = False
+ self._status.set_privkey_from(peerid)
def _query_failed(self, f, peerid):
self._queries_completed += 1
self._last_failure = f
- def _got_privkey_results(self, datavs, peerid, shnum):
+ def _got_privkey_results(self, datavs, peerid, shnum, started):
+ now = time.time()
+ elapsed = now - started
+ self._status.add_per_server_time(peerid, "privkey", elapsed)
self._queries_outstanding.discard(peerid)
if not self._need_privkey:
return
if not self._running:
return
self._running = False
+ elapsed = time.time() - self._started
+ self._status.timings["total"] = elapsed
+ self._status.set_progress(1.0)
+ self._status.set_status("Done")
+ self._status.set_active(False)
+
self._servermap.last_update_mode = self.mode
self._servermap.last_update_time = self._started
# the servermap will not be touched after this
self.log("fatal error", failure=f, level=log.WEIRD)
self._done_deferred.errback(f)
+