From: Brian Warner Date: Thu, 17 Apr 2008 02:05:41 +0000 (-0700) Subject: mutable WIP: add servermap update status pages X-Git-Tag: allmydata-tahoe-1.1.0~235 X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=a1670497a8110219ab9f84e6f20811b8da99bc31;p=tahoe-lafs%2Ftahoe-lafs.git mutable WIP: add servermap update status pages --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 2cd5d484..f3b9c599 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -304,6 +304,8 @@ class Client(node.Node, testutil.PollMixin): self.getServiceNamed("mutable-watcher").notify_publish(publish_status) def notify_retrieve(self, retrieve_status): self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status) + def notify_mapupdate(self, update_status): + self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status) def create_empty_dirnode(self): n = NewDirectoryNode(self) @@ -357,6 +359,16 @@ class Client(node.Node, testutil.PollMixin): downloader = self.getServiceNamed("downloader") return downloader.list_recent_downloads() + def list_all_mapupdate(self): + watcher = self.getServiceNamed("mutable-watcher") + return watcher.list_all_mapupdate() + def list_active_mapupdate(self): + watcher = self.getServiceNamed("mutable-watcher") + return watcher.list_active_mapupdate() + def list_recent_mapupdate(self): + watcher = self.getServiceNamed("mutable-watcher") + return watcher.list_recent_mapupdate() + def list_all_publish(self): watcher = self.getServiceNamed("mutable-watcher") return watcher.list_all_publish() diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index fff146db..a90a9f43 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1559,6 +1559,8 @@ class IDownloadStatus(Interface): that number. This provides a handle to this particular download, so a web page can generate a suitable hyperlink.""" +class IServermapUpdaterStatus(Interface): + pass class IPublishStatus(Interface): pass class IRetrieveStatus(Interface): diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py index 31ae0136..77f05a42 100644 --- a/src/allmydata/mutable/node.py +++ b/src/allmydata/mutable/node.py @@ -215,8 +215,7 @@ class MutableFileNode: def update_servermap(self, old_map=None, mode=MODE_READ): servermap = old_map or ServerMap() d = self.obtain_lock() - d.addCallback(lambda res: - ServermapUpdater(self, servermap, mode).update()) + d.addCallback(lambda res: self._update_servermap(servermap, mode)) d.addBoth(self.release_lock) return d @@ -263,6 +262,10 @@ class MutableFileNode: verifier = self.get_verifier() return self._client.getServiceNamed("checker").check(verifier) + def _update_servermap(self, old_map, mode): + u = ServermapUpdater(self, old_map, mode) + self._client.notify_mapupdate(u.get_status()) + return u.update() def _retrieve(self, servermap, verinfo): r = Retrieve(self, servermap, verinfo) @@ -325,6 +328,7 @@ class MutableFileNode: class MutableWatcher(service.MultiService): + MAX_MAPUPDATE_STATUSES = 20 MAX_PUBLISH_STATUSES = 20 MAX_RETRIEVE_STATUSES = 20 name = "mutable-watcher" @@ -332,11 +336,28 @@ class MutableWatcher(service.MultiService): def __init__(self, stats_provider=None): service.MultiService.__init__(self) self.stats_provider = stats_provider + self._all_mapupdate_status = weakref.WeakKeyDictionary() + self._recent_mapupdate_status = [] self._all_publish_status = weakref.WeakKeyDictionary() self._recent_publish_status = [] self._all_retrieve_status = weakref.WeakKeyDictionary() self._recent_retrieve_status = [] + + def notify_mapupdate(self, p): + self._all_mapupdate_status[p] = None + self._recent_mapupdate_status.append(p) + while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES: + self._recent_mapupdate_status.pop(0) + + def list_all_mapupdate(self): + return self._all_mapupdate_status.keys() + def list_active_mapupdate(self): + return [p for p in self._all_mapupdate_status.keys() if p.get_active()] + def list_recent_mapupdate(self): + return self._recent_mapupdate_status + + def notify_publish(self, p): self._all_publish_status[p] = None self._recent_publish_status.append(p) diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index edd2a8d6..8d469708 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -1,16 +1,78 @@ import sys, time +from zope.interface import implements +from itertools import count from twisted.internet import defer from twisted.python import failure from foolscap.eventual import eventually from allmydata.util import base32, hashutil, idlib, log from allmydata import storage +from allmydata.interfaces import IServermapUpdaterStatus from pycryptopp.publickey import rsa from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ DictOfSets, CorruptShareError, NeedMoreDataError from layout import unpack_prefix_and_signature, unpack_header, unpack_share +class UpdateStatus: + implements(IServermapUpdaterStatus) + statusid_counter = count(0) + def __init__(self): + self.timings = {} + self.timings["per_server"] = {} + self.timings["cumulative_verify"] = 0.0 + self.privkey_from = None + self.problems = {} + self.active = True + self.storage_index = None + self.mode = "?" + self.status = "Not started" + self.progress = 0.0 + self.counter = self.statusid_counter.next() + self.started = time.time() + + def add_per_server_time(self, peerid, op, elapsed): + assert op in ("query", "privkey") + if peerid not in self.timings["per_server"]: + self.timings["per_server"][peerid] = [] + self.timings["per_server"][peerid].append((op,elapsed)) + + def get_started(self): + return self.started + def get_storage_index(self): + return self.storage_index + def get_mode(self): + return self.mode + def get_servermap(self): + return self.servermap + def get_privkey_from(self): + return self.privkey_from + def using_helper(self): + return False + def get_size(self): + return "-NA-" + def get_status(self): + return self.status + def get_progress(self): + return self.progress + def get_active(self): + return self.active + def get_counter(self): + return self.counter + + def set_storage_index(self, si): + self.storage_index = si + def set_mode(self, mode): + self.mode = mode + def set_privkey_from(self, peerid): + self.privkey_from = peerid + def set_status(self, status): + self.status = status + def set_progress(self, value): + self.progress = value + def set_active(self, value): + self.active = value + class ServerMap: """I record the placement of mutable shares. @@ -216,6 +278,11 @@ class ServermapUpdater: self._storage_index = filenode.get_storage_index() self._last_failure = None + self._status = UpdateStatus() + self._status.set_storage_index(self._storage_index) + self._status.set_progress(0.0) + self._status.set_mode(mode) + # how much data should we read? # * if we only need the checkstring, then [0:75] # * if we need to validate the checkstring sig, then [543ish:799ish] @@ -240,6 +307,9 @@ class ServermapUpdater: self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)", si=prefix, mode=mode) + def get_status(self): + return self._status + def log(self, *args, **kwargs): if "parent" not in kwargs: kwargs["parent"] = self._log_number @@ -248,6 +318,8 @@ class ServermapUpdater: def update(self): """Update the servermap to reflect current conditions. Returns a Deferred that fires with the servermap once the update has finished.""" + self._started = time.time() + self._status.set_active(True) # self._valid_versions is a set of validated verinfo tuples. We just # use it to remember which versions had valid signatures, so we can @@ -259,7 +331,6 @@ class ServermapUpdater: # be retrievable, and to make the eventual data download faster. self.versionmap = DictOfSets() - self._started = time.time() self._done_deferred = defer.Deferred() # first, which peers should be talk to? Any that were in our old @@ -322,6 +393,7 @@ class ServermapUpdater: # enough responses) self._send_initial_requests(initial_peers_to_query) + self._status.timings["setup"] = time.time() - self._started return self._done_deferred def _build_initial_querylist(self): @@ -342,6 +414,7 @@ class ServermapUpdater: return initial_peers_to_query, must_query def _send_initial_requests(self, peerlist): + self._status.set_status("Sending %d initial queries" % len(peerlist)) self._queries_outstanding = set() self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..] dl = [] @@ -382,6 +455,9 @@ class ServermapUpdater: peerid=idlib.shortnodeid_b2a(peerid), numshares=len(datavs), level=log.NOISY) + now = time.time() + elapsed = now - started + self._status.add_per_server_time(peerid, "query", elapsed) self._queries_outstanding.discard(peerid) self._must_query.discard(peerid) self._queries_completed += 1 @@ -412,6 +488,8 @@ class ServermapUpdater: self._servermap.problems.append(f) pass + self._status.timings["cumulative_verify"] += (time.time() - now) + if self._need_privkey and last_verinfo: # send them a request for the privkey. We send one request per # server. @@ -422,9 +500,11 @@ class ServermapUpdater: self._queries_outstanding.add(peerid) readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ] ss = self._servermap.connections[peerid] + privkey_started = time.time() d = self._do_read(ss, peerid, self._storage_index, [last_shnum], readv) - d.addCallback(self._got_privkey_results, peerid, last_shnum) + d.addCallback(self._got_privkey_results, peerid, last_shnum, + privkey_started) d.addErrback(self._privkey_query_failed, peerid, last_shnum) d.addErrback(log.err) d.addCallback(self._check_for_done) @@ -540,6 +620,7 @@ class ServermapUpdater: self._node._populate_encprivkey(enc_privkey) self._node._populate_privkey(privkey) self._need_privkey = False + self._status.set_privkey_from(peerid) def _query_failed(self, f, peerid): @@ -554,7 +635,10 @@ class ServermapUpdater: self._queries_completed += 1 self._last_failure = f - def _got_privkey_results(self, datavs, peerid, shnum): + def _got_privkey_results(self, datavs, peerid, shnum, started): + now = time.time() + elapsed = now - started + self._status.add_per_server_time(peerid, "privkey", elapsed) self._queries_outstanding.discard(peerid) if not self._need_privkey: return @@ -769,6 +853,12 @@ class ServermapUpdater: if not self._running: return self._running = False + elapsed = time.time() - self._started + self._status.timings["total"] = elapsed + self._status.set_progress(1.0) + self._status.set_status("Done") + self._status.set_active(False) + self._servermap.last_update_mode = self.mode self._servermap.last_update_time = self._started # the servermap will not be touched after this @@ -779,3 +869,4 @@ class ServermapUpdater: self.log("fatal error", failure=f, level=log.WEIRD) self._done_deferred.errback(f) + diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index b49defe6..620f26e6 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -168,6 +168,8 @@ class FakeClient: pass def notify_publish(self, p): pass + def notify_mapupdate(self, u): + pass def create_node_from_uri(self, u): u = IURI(u) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 565dfe6b..00ae1caa 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -1291,10 +1291,10 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): for ul in self.clients[0].list_recent_uploads(): if ul.get_size() > 200: self._up_status = ul.get_counter() - #rs = self.clients[0].list_recent_retrieve()[0] - #self._retrieve_status = rs.get_counter() - #ps = self.clients[0].list_recent_publish()[0] - #self._publish_status = ps.get_counter() + rs = self.clients[0].list_recent_retrieve()[0] + self._retrieve_status = rs.get_counter() + ps = self.clients[0].list_recent_publish()[0] + self._publish_status = ps.get_counter() # and that there are some upload- and download- status pages return self.GET("status/up-%d" % self._up_status) @@ -1304,10 +1304,10 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): d.addCallback(_got_up) def _got_down(res): return self.GET("status/publish-%d" % self._publish_status) - #d.addCallback(_got_down) # TODO: disabled during #303 refactoring + d.addCallback(_got_down) def _got_publish(res): return self.GET("status/retrieve-%d" % self._retrieve_status) - #d.addCallback(_got_publish) + d.addCallback(_got_publish) # check that the helper status page exists d.addCallback(lambda res: diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index cbe20864..c9a7b96d 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -85,6 +85,10 @@ class FakeClient(service.MultiService): return [] def list_active_retrieve(self): return [] + def list_active_mapupdate(self): + return [] + def list_recent_mapupdate(self): + return [] def list_recent_uploads(self): return self._all_upload_status diff --git a/src/allmydata/web/map-update-status.xhtml b/src/allmydata/web/map-update-status.xhtml new file mode 100644 index 00000000..b51896b1 --- /dev/null +++ b/src/allmydata/web/map-update-status.xhtml @@ -0,0 +1,38 @@ + + + AllMyData - Tahoe - Mutable File Servermap Update Status + + + + + + +

Mutable File Servermap Update Status

+ + + +

Update Results

+ + +
Return to the Welcome Page
+ + diff --git a/src/allmydata/web/status.py b/src/allmydata/web/status.py index 2e778883..0b51b2b7 100644 --- a/src/allmydata/web/status.py +++ b/src/allmydata/web/status.py @@ -7,7 +7,7 @@ from allmydata.util import base32, idlib from allmydata.web.common import IClient, getxmlfile, abbreviate_time, \ abbreviate_rate, abbreviate_size, get_arg from allmydata.interfaces import IUploadStatus, IDownloadStatus, \ - IPublishStatus, IRetrieveStatus + IPublishStatus, IRetrieveStatus, IServermapUpdaterStatus def plural(sequence_or_length): if isinstance(sequence_or_length, int): @@ -623,6 +623,81 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin): l[T.li["[%s]: %s" % (peerid_s, times_s)]] return T.li["Per-Server Response Times: ", l] +class MapupdateStatusPage(rend.Page, RateAndTimeMixin): + docFactory = getxmlfile("map-update-status.xhtml") + + def __init__(self, data): + rend.Page.__init__(self, data) + self.update_status = data + + def render_started(self, ctx, data): + TIME_FORMAT = "%H:%M:%S %d-%b-%Y" + started_s = time.strftime(TIME_FORMAT, + time.localtime(data.get_started())) + return started_s + + def render_si(self, ctx, data): + si_s = base32.b2a_or_none(data.get_storage_index()) + if si_s is None: + si_s = "(None)" + return si_s + + def render_helper(self, ctx, data): + return {True: "Yes", + False: "No"}[data.using_helper()] + + def render_progress(self, ctx, data): + progress = data.get_progress() + # TODO: make an ascii-art bar + return "%.1f%%" % (100.0 * progress) + + def render_status(self, ctx, data): + return data.get_status() + + def render_problems(self, ctx, data): + problems = data.problems + if not problems: + return "" + l = T.ul() + for peerid in sorted(problems.keys()): + peerid_s = idlib.shortnodeid_b2a(peerid) + l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]] + return ctx.tag["Server Problems:", l] + + def render_privkey_from(self, ctx, data): + peerid = data.get_privkey_from() + if peerid: + return ctx.tag["Got privkey from: [%s]" + % idlib.shortnodeid_b2a(peerid)] + else: + return "" + + def data_time_total(self, ctx, data): + return self.update_status.timings.get("total") + + def data_time_setup(self, ctx, data): + return self.update_status.timings.get("setup") + + def data_time_cumulative_verify(self, ctx, data): + return self.update_status.timings.get("cumulative_verify") + + def render_server_timings(self, ctx, data): + per_server = self.update_status.timings.get("per_server") + if not per_server: + return "" + l = T.ul() + for peerid in sorted(per_server.keys()): + peerid_s = idlib.shortnodeid_b2a(peerid) + times = [] + for op,t in per_server[peerid]: + if op == "query": + times.append( self.render_time(None, t) ) + else: + times.append( "(" + self.render_time(None, t) + ")" ) + times_s = ", ".join(times) + l[T.li["[%s]: %s" % (peerid_s, times_s)]] + return T.li["Per-Server Response Times: ", l] + class Status(rend.Page): docFactory = getxmlfile("status.xhtml") @@ -632,6 +707,7 @@ class Status(rend.Page): client = IClient(ctx) active = (client.list_active_uploads() + client.list_active_downloads() + + client.list_active_mapupdate() + client.list_active_publish() + client.list_active_retrieve() + client.list_active_helper_statuses()) @@ -641,6 +717,7 @@ class Status(rend.Page): client = IClient(ctx) recent = [o for o in (client.list_recent_uploads() + client.list_recent_downloads() + + client.list_recent_mapupdate() + client.list_recent_publish() + client.list_recent_retrieve() + client.list_recent_helper_statuses()) @@ -688,11 +765,15 @@ class Status(rend.Page): link = "publish-%d" % data.get_counter() ctx.fillSlots("type", "publish") ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress)) - else: - assert IRetrieveStatus.providedBy(data) + elif IRetrieveStatus.providedBy(data): ctx.fillSlots("type", "retrieve") link = "retrieve-%d" % data.get_counter() ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress)) + else: + assert IServermapUpdaterStatus.providedBy(data) + ctx.fillSlots("type", "mapupdate %s" % data.get_mode()) + link = "mapupdate-%d" % data.get_counter() + ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress)) ctx.fillSlots("status", T.a(href=link)[s.get_status()]) return ctx.tag @@ -723,6 +804,14 @@ class Status(rend.Page): s = d.get_download_status() if s.get_counter() == count: return DownloadStatusPage(s) + if stype == "mapupdate": + for s in client.list_recent_mapupdate(): + if s.get_counter() == count: + return MapupdateStatusPage(s) + for p in client.list_all_mapupdate(): + s = p.get_status() + if s.get_counter() == count: + return MapupdateStatusPage(s) if stype == "publish": for s in client.list_recent_publish(): if s.get_counter() == count: