]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable WIP: add servermap update status pages
authorBrian Warner <warner@allmydata.com>
Thu, 17 Apr 2008 02:05:41 +0000 (19:05 -0700)
committerBrian Warner <warner@allmydata.com>
Thu, 17 Apr 2008 02:05:41 +0000 (19:05 -0700)
src/allmydata/client.py
src/allmydata/interfaces.py
src/allmydata/mutable/node.py
src/allmydata/mutable/servermap.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_system.py
src/allmydata/test/test_web.py
src/allmydata/web/map-update-status.xhtml [new file with mode: 0644]
src/allmydata/web/status.py

index 2cd5d48450ce0197b32115922f15332615c1d2d2..f3b9c599c949a108f7d3b2717b511a0c40e68811 100644 (file)
@@ -304,6 +304,8 @@ class Client(node.Node, testutil.PollMixin):
         self.getServiceNamed("mutable-watcher").notify_publish(publish_status)
     def notify_retrieve(self, retrieve_status):
         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
+    def notify_mapupdate(self, update_status):
+        self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
 
     def create_empty_dirnode(self):
         n = NewDirectoryNode(self)
@@ -357,6 +359,16 @@ class Client(node.Node, testutil.PollMixin):
         downloader = self.getServiceNamed("downloader")
         return downloader.list_recent_downloads()
 
+    def list_all_mapupdate(self):
+        watcher = self.getServiceNamed("mutable-watcher")
+        return watcher.list_all_mapupdate()
+    def list_active_mapupdate(self):
+        watcher = self.getServiceNamed("mutable-watcher")
+        return watcher.list_active_mapupdate()
+    def list_recent_mapupdate(self):
+        watcher = self.getServiceNamed("mutable-watcher")
+        return watcher.list_recent_mapupdate()
+
     def list_all_publish(self):
         watcher = self.getServiceNamed("mutable-watcher")
         return watcher.list_all_publish()
index fff146dbadd16473723b128f07560393c5b1bd72..a90a9f437cd1500510736566895c7275512333cf 100644 (file)
@@ -1559,6 +1559,8 @@ class IDownloadStatus(Interface):
         that number. This provides a handle to this particular download, so a
         web page can generate a suitable hyperlink."""
 
+class IServermapUpdaterStatus(Interface):
+    pass
 class IPublishStatus(Interface):
     pass
 class IRetrieveStatus(Interface):
index 31ae0136d4fb2667a3fc229204f15abe93e90d4e..77f05a42218264cc83860b69767830c73431d82c 100644 (file)
@@ -215,8 +215,7 @@ class MutableFileNode:
     def update_servermap(self, old_map=None, mode=MODE_READ):
         servermap = old_map or ServerMap()
         d = self.obtain_lock()
-        d.addCallback(lambda res:
-                      ServermapUpdater(self, servermap, mode).update())
+        d.addCallback(lambda res: self._update_servermap(servermap, mode))
         d.addBoth(self.release_lock)
         return d
 
@@ -263,6 +262,10 @@ class MutableFileNode:
         verifier = self.get_verifier()
         return self._client.getServiceNamed("checker").check(verifier)
 
+    def _update_servermap(self, old_map, mode):
+        u = ServermapUpdater(self, old_map, mode)
+        self._client.notify_mapupdate(u.get_status())
+        return u.update()
 
     def _retrieve(self, servermap, verinfo):
         r = Retrieve(self, servermap, verinfo)
@@ -325,6 +328,7 @@ class MutableFileNode:
 
 
 class MutableWatcher(service.MultiService):
+    MAX_MAPUPDATE_STATUSES = 20
     MAX_PUBLISH_STATUSES = 20
     MAX_RETRIEVE_STATUSES = 20
     name = "mutable-watcher"
@@ -332,11 +336,28 @@ class MutableWatcher(service.MultiService):
     def __init__(self, stats_provider=None):
         service.MultiService.__init__(self)
         self.stats_provider = stats_provider
+        self._all_mapupdate_status = weakref.WeakKeyDictionary()
+        self._recent_mapupdate_status = []
         self._all_publish_status = weakref.WeakKeyDictionary()
         self._recent_publish_status = []
         self._all_retrieve_status = weakref.WeakKeyDictionary()
         self._recent_retrieve_status = []
 
+
+    def notify_mapupdate(self, p):
+        self._all_mapupdate_status[p] = None
+        self._recent_mapupdate_status.append(p)
+        while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
+            self._recent_mapupdate_status.pop(0)
+
+    def list_all_mapupdate(self):
+        return self._all_mapupdate_status.keys()
+    def list_active_mapupdate(self):
+        return [p for p in self._all_mapupdate_status.keys() if p.get_active()]
+    def list_recent_mapupdate(self):
+        return self._recent_mapupdate_status
+
+
     def notify_publish(self, p):
         self._all_publish_status[p] = None
         self._recent_publish_status.append(p)
index edd2a8d66417670983d38bef2b92e077a840a8b2..8d469708e3b0de61424a25f9c90c4432cf2dbfc9 100644 (file)
@@ -1,16 +1,78 @@
 
 import sys, time
+from zope.interface import implements
+from itertools import count
 from twisted.internet import defer
 from twisted.python import failure
 from foolscap.eventual import eventually
 from allmydata.util import base32, hashutil, idlib, log
 from allmydata import storage
+from allmydata.interfaces import IServermapUpdaterStatus
 from pycryptopp.publickey import rsa
 
 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
      DictOfSets, CorruptShareError, NeedMoreDataError
 from layout import unpack_prefix_and_signature, unpack_header, unpack_share
 
+class UpdateStatus:
+    implements(IServermapUpdaterStatus)
+    statusid_counter = count(0)
+    def __init__(self):
+        self.timings = {}
+        self.timings["per_server"] = {}
+        self.timings["cumulative_verify"] = 0.0
+        self.privkey_from = None
+        self.problems = {}
+        self.active = True
+        self.storage_index = None
+        self.mode = "?"
+        self.status = "Not started"
+        self.progress = 0.0
+        self.counter = self.statusid_counter.next()
+        self.started = time.time()
+
+    def add_per_server_time(self, peerid, op, elapsed):
+        assert op in ("query", "privkey")
+        if peerid not in self.timings["per_server"]:
+            self.timings["per_server"][peerid] = []
+        self.timings["per_server"][peerid].append((op,elapsed))
+
+    def get_started(self):
+        return self.started
+    def get_storage_index(self):
+        return self.storage_index
+    def get_mode(self):
+        return self.mode
+    def get_servermap(self):
+        return self.servermap
+    def get_privkey_from(self):
+        return self.privkey_from
+    def using_helper(self):
+        return False
+    def get_size(self):
+        return "-NA-"
+    def get_status(self):
+        return self.status
+    def get_progress(self):
+        return self.progress
+    def get_active(self):
+        return self.active
+    def get_counter(self):
+        return self.counter
+
+    def set_storage_index(self, si):
+        self.storage_index = si
+    def set_mode(self, mode):
+        self.mode = mode
+    def set_privkey_from(self, peerid):
+        self.privkey_from = peerid
+    def set_status(self, status):
+        self.status = status
+    def set_progress(self, value):
+        self.progress = value
+    def set_active(self, value):
+        self.active = value
+
 class ServerMap:
     """I record the placement of mutable shares.
 
@@ -216,6 +278,11 @@ class ServermapUpdater:
         self._storage_index = filenode.get_storage_index()
         self._last_failure = None
 
+        self._status = UpdateStatus()
+        self._status.set_storage_index(self._storage_index)
+        self._status.set_progress(0.0)
+        self._status.set_mode(mode)
+
         # how much data should we read?
         #  * if we only need the checkstring, then [0:75]
         #  * if we need to validate the checkstring sig, then [543ish:799ish]
@@ -240,6 +307,9 @@ class ServermapUpdater:
         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
                                    si=prefix, mode=mode)
 
+    def get_status(self):
+        return self._status
+
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
             kwargs["parent"] = self._log_number
@@ -248,6 +318,8 @@ class ServermapUpdater:
     def update(self):
         """Update the servermap to reflect current conditions. Returns a
         Deferred that fires with the servermap once the update has finished."""
+        self._started = time.time()
+        self._status.set_active(True)
 
         # self._valid_versions is a set of validated verinfo tuples. We just
         # use it to remember which versions had valid signatures, so we can
@@ -259,7 +331,6 @@ class ServermapUpdater:
         # be retrievable, and to make the eventual data download faster.
         self.versionmap = DictOfSets()
 
-        self._started = time.time()
         self._done_deferred = defer.Deferred()
 
         # first, which peers should be talk to? Any that were in our old
@@ -322,6 +393,7 @@ class ServermapUpdater:
         # enough responses)
 
         self._send_initial_requests(initial_peers_to_query)
+        self._status.timings["setup"] = time.time() - self._started
         return self._done_deferred
 
     def _build_initial_querylist(self):
@@ -342,6 +414,7 @@ class ServermapUpdater:
         return initial_peers_to_query, must_query
 
     def _send_initial_requests(self, peerlist):
+        self._status.set_status("Sending %d initial queries" % len(peerlist))
         self._queries_outstanding = set()
         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
         dl = []
@@ -382,6 +455,9 @@ class ServermapUpdater:
                      peerid=idlib.shortnodeid_b2a(peerid),
                      numshares=len(datavs),
                      level=log.NOISY)
+        now = time.time()
+        elapsed = now - started
+        self._status.add_per_server_time(peerid, "query", elapsed)
         self._queries_outstanding.discard(peerid)
         self._must_query.discard(peerid)
         self._queries_completed += 1
@@ -412,6 +488,8 @@ class ServermapUpdater:
                 self._servermap.problems.append(f)
                 pass
 
+        self._status.timings["cumulative_verify"] += (time.time() - now)
+
         if self._need_privkey and last_verinfo:
             # send them a request for the privkey. We send one request per
             # server.
@@ -422,9 +500,11 @@ class ServermapUpdater:
             self._queries_outstanding.add(peerid)
             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
             ss = self._servermap.connections[peerid]
+            privkey_started = time.time()
             d = self._do_read(ss, peerid, self._storage_index,
                               [last_shnum], readv)
-            d.addCallback(self._got_privkey_results, peerid, last_shnum)
+            d.addCallback(self._got_privkey_results, peerid, last_shnum,
+                          privkey_started)
             d.addErrback(self._privkey_query_failed, peerid, last_shnum)
             d.addErrback(log.err)
             d.addCallback(self._check_for_done)
@@ -540,6 +620,7 @@ class ServermapUpdater:
         self._node._populate_encprivkey(enc_privkey)
         self._node._populate_privkey(privkey)
         self._need_privkey = False
+        self._status.set_privkey_from(peerid)
 
 
     def _query_failed(self, f, peerid):
@@ -554,7 +635,10 @@ class ServermapUpdater:
         self._queries_completed += 1
         self._last_failure = f
 
-    def _got_privkey_results(self, datavs, peerid, shnum):
+    def _got_privkey_results(self, datavs, peerid, shnum, started):
+        now = time.time()
+        elapsed = now - started
+        self._status.add_per_server_time(peerid, "privkey", elapsed)
         self._queries_outstanding.discard(peerid)
         if not self._need_privkey:
             return
@@ -769,6 +853,12 @@ class ServermapUpdater:
         if not self._running:
             return
         self._running = False
+        elapsed = time.time() - self._started
+        self._status.timings["total"] = elapsed
+        self._status.set_progress(1.0)
+        self._status.set_status("Done")
+        self._status.set_active(False)
+
         self._servermap.last_update_mode = self.mode
         self._servermap.last_update_time = self._started
         # the servermap will not be touched after this
@@ -779,3 +869,4 @@ class ServermapUpdater:
         self.log("fatal error", failure=f, level=log.WEIRD)
         self._done_deferred.errback(f)
 
+
index b49defe6353dceae124e92b899806e166a7cd1ae..620f26e6fda7ea6f60772f243247a3fb103fc6cd 100644 (file)
@@ -168,6 +168,8 @@ class FakeClient:
         pass
     def notify_publish(self, p):
         pass
+    def notify_mapupdate(self, u):
+        pass
 
     def create_node_from_uri(self, u):
         u = IURI(u)
index 565dfe6b9f22a46d4b81eefb963e6e678e61a2f6..00ae1caae441b517e38bacd7065e89ffc96c23aa 100644 (file)
@@ -1291,10 +1291,10 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
             for ul in self.clients[0].list_recent_uploads():
                 if ul.get_size() > 200:
                     self._up_status = ul.get_counter()
-            #rs = self.clients[0].list_recent_retrieve()[0]
-            #self._retrieve_status = rs.get_counter()
-            #ps = self.clients[0].list_recent_publish()[0]
-            #self._publish_status = ps.get_counter()
+            rs = self.clients[0].list_recent_retrieve()[0]
+            self._retrieve_status = rs.get_counter()
+            ps = self.clients[0].list_recent_publish()[0]
+            self._publish_status = ps.get_counter()
 
             # and that there are some upload- and download- status pages
             return self.GET("status/up-%d" % self._up_status)
@@ -1304,10 +1304,10 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
         d.addCallback(_got_up)
         def _got_down(res):
             return self.GET("status/publish-%d" % self._publish_status)
-        #d.addCallback(_got_down) # TODO: disabled during #303 refactoring
+        d.addCallback(_got_down)
         def _got_publish(res):
             return self.GET("status/retrieve-%d" % self._retrieve_status)
-        #d.addCallback(_got_publish)
+        d.addCallback(_got_publish)
 
         # check that the helper status page exists
         d.addCallback(lambda res:
index cbe20864961cc9e6674a79ba70a66c82bfadc4bd..c9a7b96d37448989dbb16d1143321345a347e70e 100644 (file)
@@ -85,6 +85,10 @@ class FakeClient(service.MultiService):
         return []
     def list_active_retrieve(self):
         return []
+    def list_active_mapupdate(self):
+        return []
+    def list_recent_mapupdate(self):
+        return []
 
     def list_recent_uploads(self):
         return self._all_upload_status
diff --git a/src/allmydata/web/map-update-status.xhtml b/src/allmydata/web/map-update-status.xhtml
new file mode 100644 (file)
index 0000000..b51896b
--- /dev/null
@@ -0,0 +1,38 @@
+<html xmlns:n="http://nevow.com/ns/nevow/0.1">
+  <head>
+    <title>AllMyData - Tahoe - Mutable File Servermap Update Status</title>
+    <!-- <link href="http://www.allmydata.com/common/css/styles.css"
+          rel="stylesheet" type="text/css"/> -->
+    <link href="/webform_css" rel="stylesheet" type="text/css"/>
+    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+  </head>
+  <body>
+
+<h1>Mutable File Servermap Update Status</h1>
+
+<ul>
+  <li>Started: <span n:render="started"/></li>
+  <li>Storage Index: <span n:render="si"/></li>
+  <li>Helper?: <span n:render="helper"/></li>
+  <li>Progress: <span n:render="progress"/></li>
+  <li>Status: <span n:render="status"/></li>
+</ul>
+
+<h2>Update Results</h2>
+<ul>
+  <li n:render="problems" />
+  <li>Timings:</li>
+  <ul>
+    <li>Total: <span n:render="time" n:data="time_total" /></li>
+    <ul>
+      <li>Setup: <span n:render="time" n:data="time_setup" /></li>
+      <li n:render="privkey_from" />
+      <li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li>
+    </ul>
+    <li n:render="server_timings" />
+  </ul>
+</ul>
+
+<div>Return to the <a href="/">Welcome Page</a></div>
+
+</body></html>
index 2e7788836b207faac4c9c945e9a4707970c0a82b..0b51b2b75c9949400f93fa6e7c648386289ec862 100644 (file)
@@ -7,7 +7,7 @@ from allmydata.util import base32, idlib
 from allmydata.web.common import IClient, getxmlfile, abbreviate_time, \
      abbreviate_rate, abbreviate_size, get_arg
 from allmydata.interfaces import IUploadStatus, IDownloadStatus, \
-     IPublishStatus, IRetrieveStatus
+     IPublishStatus, IRetrieveStatus, IServermapUpdaterStatus
 
 def plural(sequence_or_length):
     if isinstance(sequence_or_length, int):
@@ -623,6 +623,81 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
             l[T.li["[%s]: %s" % (peerid_s, times_s)]]
         return T.li["Per-Server Response Times: ", l]
 
+class MapupdateStatusPage(rend.Page, RateAndTimeMixin):
+    docFactory = getxmlfile("map-update-status.xhtml")
+
+    def __init__(self, data):
+        rend.Page.__init__(self, data)
+        self.update_status = data
+
+    def render_started(self, ctx, data):
+        TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
+        started_s = time.strftime(TIME_FORMAT,
+                                  time.localtime(data.get_started()))
+        return started_s
+
+    def render_si(self, ctx, data):
+        si_s = base32.b2a_or_none(data.get_storage_index())
+        if si_s is None:
+            si_s = "(None)"
+        return si_s
+
+    def render_helper(self, ctx, data):
+        return {True: "Yes",
+                False: "No"}[data.using_helper()]
+
+    def render_progress(self, ctx, data):
+        progress = data.get_progress()
+        # TODO: make an ascii-art bar
+        return "%.1f%%" % (100.0 * progress)
+
+    def render_status(self, ctx, data):
+        return data.get_status()
+
+    def render_problems(self, ctx, data):
+        problems = data.problems
+        if not problems:
+            return ""
+        l = T.ul()
+        for peerid in sorted(problems.keys()):
+            peerid_s = idlib.shortnodeid_b2a(peerid)
+            l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]]
+        return ctx.tag["Server Problems:", l]
+
+    def render_privkey_from(self, ctx, data):
+        peerid = data.get_privkey_from()
+        if peerid:
+            return ctx.tag["Got privkey from: [%s]"
+                           % idlib.shortnodeid_b2a(peerid)]
+        else:
+            return ""
+
+    def data_time_total(self, ctx, data):
+        return self.update_status.timings.get("total")
+
+    def data_time_setup(self, ctx, data):
+        return self.update_status.timings.get("setup")
+
+    def data_time_cumulative_verify(self, ctx, data):
+        return self.update_status.timings.get("cumulative_verify")
+
+    def render_server_timings(self, ctx, data):
+        per_server = self.update_status.timings.get("per_server")
+        if not per_server:
+            return ""
+        l = T.ul()
+        for peerid in sorted(per_server.keys()):
+            peerid_s = idlib.shortnodeid_b2a(peerid)
+            times = []
+            for op,t in per_server[peerid]:
+                if op == "query":
+                    times.append( self.render_time(None, t) )
+                else:
+                    times.append( "(" + self.render_time(None, t) + ")" )
+            times_s = ", ".join(times)
+            l[T.li["[%s]: %s" % (peerid_s, times_s)]]
+        return T.li["Per-Server Response Times: ", l]
+
 
 class Status(rend.Page):
     docFactory = getxmlfile("status.xhtml")
@@ -632,6 +707,7 @@ class Status(rend.Page):
         client = IClient(ctx)
         active =  (client.list_active_uploads() +
                    client.list_active_downloads() +
+                   client.list_active_mapupdate() +
                    client.list_active_publish() +
                    client.list_active_retrieve() +
                    client.list_active_helper_statuses())
@@ -641,6 +717,7 @@ class Status(rend.Page):
         client = IClient(ctx)
         recent = [o for o in (client.list_recent_uploads() +
                               client.list_recent_downloads() +
+                              client.list_recent_mapupdate() +
                               client.list_recent_publish() +
                               client.list_recent_retrieve() +
                               client.list_recent_helper_statuses())
@@ -688,11 +765,15 @@ class Status(rend.Page):
             link = "publish-%d" % data.get_counter()
             ctx.fillSlots("type", "publish")
             ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
-        else:
-            assert IRetrieveStatus.providedBy(data)
+        elif IRetrieveStatus.providedBy(data):
             ctx.fillSlots("type", "retrieve")
             link = "retrieve-%d" % data.get_counter()
             ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
+        else:
+            assert IServermapUpdaterStatus.providedBy(data)
+            ctx.fillSlots("type", "mapupdate %s" % data.get_mode())
+            link = "mapupdate-%d" % data.get_counter()
+            ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
         ctx.fillSlots("status", T.a(href=link)[s.get_status()])
         return ctx.tag
 
@@ -723,6 +804,14 @@ class Status(rend.Page):
                 s = d.get_download_status()
                 if s.get_counter() == count:
                     return DownloadStatusPage(s)
+        if stype == "mapupdate":
+            for s in client.list_recent_mapupdate():
+                if s.get_counter() == count:
+                    return MapupdateStatusPage(s)
+            for p in client.list_all_mapupdate():
+                s = p.get_status()
+                if s.get_counter() == count:
+                    return MapupdateStatusPage(s)
         if stype == "publish":
             for s in client.list_recent_publish():
                 if s.get_counter() == count: