]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/servermap.py
mutable WIP: add servermap update status pages
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
index edd2a8d66417670983d38bef2b92e077a840a8b2..8d469708e3b0de61424a25f9c90c4432cf2dbfc9 100644 (file)
@@ -1,16 +1,78 @@
 
 import sys, time
+from zope.interface import implements
+from itertools import count
 from twisted.internet import defer
 from twisted.python import failure
 from foolscap.eventual import eventually
 from allmydata.util import base32, hashutil, idlib, log
 from allmydata import storage
+from allmydata.interfaces import IServermapUpdaterStatus
 from pycryptopp.publickey import rsa
 
 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
      DictOfSets, CorruptShareError, NeedMoreDataError
 from layout import unpack_prefix_and_signature, unpack_header, unpack_share
 
+class UpdateStatus:
+    implements(IServermapUpdaterStatus)
+    statusid_counter = count(0)
+    def __init__(self):
+        self.timings = {}
+        self.timings["per_server"] = {}
+        self.timings["cumulative_verify"] = 0.0
+        self.privkey_from = None
+        self.problems = {}
+        self.active = True
+        self.storage_index = None
+        self.mode = "?"
+        self.status = "Not started"
+        self.progress = 0.0
+        self.counter = self.statusid_counter.next()
+        self.started = time.time()
+
+    def add_per_server_time(self, peerid, op, elapsed):
+        assert op in ("query", "privkey")
+        if peerid not in self.timings["per_server"]:
+            self.timings["per_server"][peerid] = []
+        self.timings["per_server"][peerid].append((op,elapsed))
+
+    def get_started(self):
+        return self.started
+    def get_storage_index(self):
+        return self.storage_index
+    def get_mode(self):
+        return self.mode
+    def get_servermap(self):
+        return self.servermap
+    def get_privkey_from(self):
+        return self.privkey_from
+    def using_helper(self):
+        return False
+    def get_size(self):
+        return "-NA-"
+    def get_status(self):
+        return self.status
+    def get_progress(self):
+        return self.progress
+    def get_active(self):
+        return self.active
+    def get_counter(self):
+        return self.counter
+
+    def set_storage_index(self, si):
+        self.storage_index = si
+    def set_mode(self, mode):
+        self.mode = mode
+    def set_privkey_from(self, peerid):
+        self.privkey_from = peerid
+    def set_status(self, status):
+        self.status = status
+    def set_progress(self, value):
+        self.progress = value
+    def set_active(self, value):
+        self.active = value
+
 class ServerMap:
     """I record the placement of mutable shares.
 
@@ -216,6 +278,11 @@ class ServermapUpdater:
         self._storage_index = filenode.get_storage_index()
         self._last_failure = None
 
+        self._status = UpdateStatus()
+        self._status.set_storage_index(self._storage_index)
+        self._status.set_progress(0.0)
+        self._status.set_mode(mode)
+
         # how much data should we read?
         #  * if we only need the checkstring, then [0:75]
         #  * if we need to validate the checkstring sig, then [543ish:799ish]
@@ -240,6 +307,9 @@ class ServermapUpdater:
         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
                                    si=prefix, mode=mode)
 
+    def get_status(self):
+        return self._status
+
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
             kwargs["parent"] = self._log_number
@@ -248,6 +318,8 @@ class ServermapUpdater:
     def update(self):
         """Update the servermap to reflect current conditions. Returns a
         Deferred that fires with the servermap once the update has finished."""
+        self._started = time.time()
+        self._status.set_active(True)
 
         # self._valid_versions is a set of validated verinfo tuples. We just
         # use it to remember which versions had valid signatures, so we can
@@ -259,7 +331,6 @@ class ServermapUpdater:
         # be retrievable, and to make the eventual data download faster.
         self.versionmap = DictOfSets()
 
-        self._started = time.time()
         self._done_deferred = defer.Deferred()
 
         # first, which peers should be talk to? Any that were in our old
@@ -322,6 +393,7 @@ class ServermapUpdater:
         # enough responses)
 
         self._send_initial_requests(initial_peers_to_query)
+        self._status.timings["setup"] = time.time() - self._started
         return self._done_deferred
 
     def _build_initial_querylist(self):
@@ -342,6 +414,7 @@ class ServermapUpdater:
         return initial_peers_to_query, must_query
 
     def _send_initial_requests(self, peerlist):
+        self._status.set_status("Sending %d initial queries" % len(peerlist))
         self._queries_outstanding = set()
         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
         dl = []
@@ -382,6 +455,9 @@ class ServermapUpdater:
                      peerid=idlib.shortnodeid_b2a(peerid),
                      numshares=len(datavs),
                      level=log.NOISY)
+        now = time.time()
+        elapsed = now - started
+        self._status.add_per_server_time(peerid, "query", elapsed)
         self._queries_outstanding.discard(peerid)
         self._must_query.discard(peerid)
         self._queries_completed += 1
@@ -412,6 +488,8 @@ class ServermapUpdater:
                 self._servermap.problems.append(f)
                 pass
 
+        self._status.timings["cumulative_verify"] += (time.time() - now)
+
         if self._need_privkey and last_verinfo:
             # send them a request for the privkey. We send one request per
             # server.
@@ -422,9 +500,11 @@ class ServermapUpdater:
             self._queries_outstanding.add(peerid)
             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
             ss = self._servermap.connections[peerid]
+            privkey_started = time.time()
             d = self._do_read(ss, peerid, self._storage_index,
                               [last_shnum], readv)
-            d.addCallback(self._got_privkey_results, peerid, last_shnum)
+            d.addCallback(self._got_privkey_results, peerid, last_shnum,
+                          privkey_started)
             d.addErrback(self._privkey_query_failed, peerid, last_shnum)
             d.addErrback(log.err)
             d.addCallback(self._check_for_done)
@@ -540,6 +620,7 @@ class ServermapUpdater:
         self._node._populate_encprivkey(enc_privkey)
         self._node._populate_privkey(privkey)
         self._need_privkey = False
+        self._status.set_privkey_from(peerid)
 
 
     def _query_failed(self, f, peerid):
@@ -554,7 +635,10 @@ class ServermapUpdater:
         self._queries_completed += 1
         self._last_failure = f
 
-    def _got_privkey_results(self, datavs, peerid, shnum):
+    def _got_privkey_results(self, datavs, peerid, shnum, started):
+        now = time.time()
+        elapsed = now - started
+        self._status.add_per_server_time(peerid, "privkey", elapsed)
         self._queries_outstanding.discard(peerid)
         if not self._need_privkey:
             return
@@ -769,6 +853,12 @@ class ServermapUpdater:
         if not self._running:
             return
         self._running = False
+        elapsed = time.time() - self._started
+        self._status.timings["total"] = elapsed
+        self._status.set_progress(1.0)
+        self._status.set_status("Done")
+        self._status.set_active(False)
+
         self._servermap.last_update_mode = self.mode
         self._servermap.last_update_time = self._started
         # the servermap will not be touched after this
@@ -779,3 +869,4 @@ class ServermapUpdater:
         self.log("fatal error", failure=f, level=log.WEIRD)
         self._done_deferred.errback(f)
 
+