self.getServiceNamed("mutable-watcher").notify_publish(publish_status)
def notify_retrieve(self, retrieve_status):
self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
+ def notify_mapupdate(self, update_status):
+ self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
def create_empty_dirnode(self):
n = NewDirectoryNode(self)
downloader = self.getServiceNamed("downloader")
return downloader.list_recent_downloads()
+ def list_all_mapupdate(self):
+ watcher = self.getServiceNamed("mutable-watcher")
+ return watcher.list_all_mapupdate()
+ def list_active_mapupdate(self):
+ watcher = self.getServiceNamed("mutable-watcher")
+ return watcher.list_active_mapupdate()
+ def list_recent_mapupdate(self):
+ watcher = self.getServiceNamed("mutable-watcher")
+ return watcher.list_recent_mapupdate()
+
def list_all_publish(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_all_publish()
that number. This provides a handle to this particular download, so a
web page can generate a suitable hyperlink."""
+class IServermapUpdaterStatus(Interface):
+ pass
class IPublishStatus(Interface):
pass
class IRetrieveStatus(Interface):
def update_servermap(self, old_map=None, mode=MODE_READ):
servermap = old_map or ServerMap()
d = self.obtain_lock()
- d.addCallback(lambda res:
- ServermapUpdater(self, servermap, mode).update())
+ d.addCallback(lambda res: self._update_servermap(servermap, mode))
d.addBoth(self.release_lock)
return d
verifier = self.get_verifier()
return self._client.getServiceNamed("checker").check(verifier)
+ def _update_servermap(self, old_map, mode):
+ u = ServermapUpdater(self, old_map, mode)
+ self._client.notify_mapupdate(u.get_status())
+ return u.update()
def _retrieve(self, servermap, verinfo):
r = Retrieve(self, servermap, verinfo)
class MutableWatcher(service.MultiService):
+ MAX_MAPUPDATE_STATUSES = 20
MAX_PUBLISH_STATUSES = 20
MAX_RETRIEVE_STATUSES = 20
name = "mutable-watcher"
def __init__(self, stats_provider=None):
service.MultiService.__init__(self)
self.stats_provider = stats_provider
+ self._all_mapupdate_status = weakref.WeakKeyDictionary()
+ self._recent_mapupdate_status = []
self._all_publish_status = weakref.WeakKeyDictionary()
self._recent_publish_status = []
self._all_retrieve_status = weakref.WeakKeyDictionary()
self._recent_retrieve_status = []
+
+ def notify_mapupdate(self, p):
+ self._all_mapupdate_status[p] = None
+ self._recent_mapupdate_status.append(p)
+ while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
+ self._recent_mapupdate_status.pop(0)
+
+ def list_all_mapupdate(self):
+ return self._all_mapupdate_status.keys()
+ def list_active_mapupdate(self):
+ return [p for p in self._all_mapupdate_status.keys() if p.get_active()]
+ def list_recent_mapupdate(self):
+ return self._recent_mapupdate_status
+
+
def notify_publish(self, p):
self._all_publish_status[p] = None
self._recent_publish_status.append(p)
import sys, time
+from zope.interface import implements
+from itertools import count
from twisted.internet import defer
from twisted.python import failure
from foolscap.eventual import eventually
from allmydata.util import base32, hashutil, idlib, log
from allmydata import storage
+from allmydata.interfaces import IServermapUpdaterStatus
from pycryptopp.publickey import rsa
from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
DictOfSets, CorruptShareError, NeedMoreDataError
from layout import unpack_prefix_and_signature, unpack_header, unpack_share
+class UpdateStatus:
+ implements(IServermapUpdaterStatus)
+ statusid_counter = count(0)
+ def __init__(self):
+ self.timings = {}
+ self.timings["per_server"] = {}
+ self.timings["cumulative_verify"] = 0.0
+ self.privkey_from = None
+ self.problems = {}
+ self.active = True
+ self.storage_index = None
+ self.mode = "?"
+ self.status = "Not started"
+ self.progress = 0.0
+ self.counter = self.statusid_counter.next()
+ self.started = time.time()
+
+ def add_per_server_time(self, peerid, op, elapsed):
+ assert op in ("query", "privkey")
+ if peerid not in self.timings["per_server"]:
+ self.timings["per_server"][peerid] = []
+ self.timings["per_server"][peerid].append((op,elapsed))
+
+ def get_started(self):
+ return self.started
+ def get_storage_index(self):
+ return self.storage_index
+ def get_mode(self):
+ return self.mode
+ def get_servermap(self):
+ return self.servermap
+ def get_privkey_from(self):
+ return self.privkey_from
+ def using_helper(self):
+ return False
+ def get_size(self):
+ return "-NA-"
+ def get_status(self):
+ return self.status
+ def get_progress(self):
+ return self.progress
+ def get_active(self):
+ return self.active
+ def get_counter(self):
+ return self.counter
+
+ def set_storage_index(self, si):
+ self.storage_index = si
+ def set_mode(self, mode):
+ self.mode = mode
+ def set_privkey_from(self, peerid):
+ self.privkey_from = peerid
+ def set_status(self, status):
+ self.status = status
+ def set_progress(self, value):
+ self.progress = value
+ def set_active(self, value):
+ self.active = value
+
class ServerMap:
"""I record the placement of mutable shares.
self._storage_index = filenode.get_storage_index()
self._last_failure = None
+ self._status = UpdateStatus()
+ self._status.set_storage_index(self._storage_index)
+ self._status.set_progress(0.0)
+ self._status.set_mode(mode)
+
# how much data should we read?
# * if we only need the checkstring, then [0:75]
# * if we need to validate the checkstring sig, then [543ish:799ish]
self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
si=prefix, mode=mode)
+ def get_status(self):
+ return self._status
+
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
def update(self):
"""Update the servermap to reflect current conditions. Returns a
Deferred that fires with the servermap once the update has finished."""
+ self._started = time.time()
+ self._status.set_active(True)
# self._valid_versions is a set of validated verinfo tuples. We just
# use it to remember which versions had valid signatures, so we can
# be retrievable, and to make the eventual data download faster.
self.versionmap = DictOfSets()
- self._started = time.time()
self._done_deferred = defer.Deferred()
# first, which peers should be talk to? Any that were in our old
# enough responses)
self._send_initial_requests(initial_peers_to_query)
+ self._status.timings["setup"] = time.time() - self._started
return self._done_deferred
def _build_initial_querylist(self):
return initial_peers_to_query, must_query
def _send_initial_requests(self, peerlist):
+ self._status.set_status("Sending %d initial queries" % len(peerlist))
self._queries_outstanding = set()
self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
dl = []
peerid=idlib.shortnodeid_b2a(peerid),
numshares=len(datavs),
level=log.NOISY)
+ now = time.time()
+ elapsed = now - started
+ self._status.add_per_server_time(peerid, "query", elapsed)
self._queries_outstanding.discard(peerid)
self._must_query.discard(peerid)
self._queries_completed += 1
self._servermap.problems.append(f)
pass
+ self._status.timings["cumulative_verify"] += (time.time() - now)
+
if self._need_privkey and last_verinfo:
# send them a request for the privkey. We send one request per
# server.
self._queries_outstanding.add(peerid)
readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
ss = self._servermap.connections[peerid]
+ privkey_started = time.time()
d = self._do_read(ss, peerid, self._storage_index,
[last_shnum], readv)
- d.addCallback(self._got_privkey_results, peerid, last_shnum)
+ d.addCallback(self._got_privkey_results, peerid, last_shnum,
+ privkey_started)
d.addErrback(self._privkey_query_failed, peerid, last_shnum)
d.addErrback(log.err)
d.addCallback(self._check_for_done)
self._node._populate_encprivkey(enc_privkey)
self._node._populate_privkey(privkey)
self._need_privkey = False
+ self._status.set_privkey_from(peerid)
def _query_failed(self, f, peerid):
self._queries_completed += 1
self._last_failure = f
- def _got_privkey_results(self, datavs, peerid, shnum):
+ def _got_privkey_results(self, datavs, peerid, shnum, started):
+ now = time.time()
+ elapsed = now - started
+ self._status.add_per_server_time(peerid, "privkey", elapsed)
self._queries_outstanding.discard(peerid)
if not self._need_privkey:
return
if not self._running:
return
self._running = False
+ elapsed = time.time() - self._started
+ self._status.timings["total"] = elapsed
+ self._status.set_progress(1.0)
+ self._status.set_status("Done")
+ self._status.set_active(False)
+
self._servermap.last_update_mode = self.mode
self._servermap.last_update_time = self._started
# the servermap will not be touched after this
self.log("fatal error", failure=f, level=log.WEIRD)
self._done_deferred.errback(f)
+
pass
def notify_publish(self, p):
pass
+ def notify_mapupdate(self, u):
+ pass
def create_node_from_uri(self, u):
u = IURI(u)
for ul in self.clients[0].list_recent_uploads():
if ul.get_size() > 200:
self._up_status = ul.get_counter()
- #rs = self.clients[0].list_recent_retrieve()[0]
- #self._retrieve_status = rs.get_counter()
- #ps = self.clients[0].list_recent_publish()[0]
- #self._publish_status = ps.get_counter()
+ rs = self.clients[0].list_recent_retrieve()[0]
+ self._retrieve_status = rs.get_counter()
+ ps = self.clients[0].list_recent_publish()[0]
+ self._publish_status = ps.get_counter()
# and that there are some upload- and download- status pages
return self.GET("status/up-%d" % self._up_status)
d.addCallback(_got_up)
def _got_down(res):
return self.GET("status/publish-%d" % self._publish_status)
- #d.addCallback(_got_down) # TODO: disabled during #303 refactoring
+ d.addCallback(_got_down)
def _got_publish(res):
return self.GET("status/retrieve-%d" % self._retrieve_status)
- #d.addCallback(_got_publish)
+ d.addCallback(_got_publish)
# check that the helper status page exists
d.addCallback(lambda res:
return []
def list_active_retrieve(self):
return []
+ def list_active_mapupdate(self):
+ return []
+ def list_recent_mapupdate(self):
+ return []
def list_recent_uploads(self):
return self._all_upload_status
--- /dev/null
+<html xmlns:n="http://nevow.com/ns/nevow/0.1">
+ <head>
+ <title>AllMyData - Tahoe - Mutable File Servermap Update Status</title>
+ <!-- <link href="http://www.allmydata.com/common/css/styles.css"
+ rel="stylesheet" type="text/css"/> -->
+ <link href="/webform_css" rel="stylesheet" type="text/css"/>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+ </head>
+ <body>
+
+<h1>Mutable File Servermap Update Status</h1>
+
+<ul>
+ <li>Started: <span n:render="started"/></li>
+ <li>Storage Index: <span n:render="si"/></li>
+ <li>Helper?: <span n:render="helper"/></li>
+ <li>Progress: <span n:render="progress"/></li>
+ <li>Status: <span n:render="status"/></li>
+</ul>
+
+<h2>Update Results</h2>
+<ul>
+ <li n:render="problems" />
+ <li>Timings:</li>
+ <ul>
+ <li>Total: <span n:render="time" n:data="time_total" /></li>
+ <ul>
+ <li>Setup: <span n:render="time" n:data="time_setup" /></li>
+ <li n:render="privkey_from" />
+ <li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li>
+ </ul>
+ <li n:render="server_timings" />
+ </ul>
+</ul>
+
+<div>Return to the <a href="/">Welcome Page</a></div>
+
+</body></html>
from allmydata.web.common import IClient, getxmlfile, abbreviate_time, \
abbreviate_rate, abbreviate_size, get_arg
from allmydata.interfaces import IUploadStatus, IDownloadStatus, \
- IPublishStatus, IRetrieveStatus
+ IPublishStatus, IRetrieveStatus, IServermapUpdaterStatus
def plural(sequence_or_length):
if isinstance(sequence_or_length, int):
l[T.li["[%s]: %s" % (peerid_s, times_s)]]
return T.li["Per-Server Response Times: ", l]
+class MapupdateStatusPage(rend.Page, RateAndTimeMixin):
+ docFactory = getxmlfile("map-update-status.xhtml")
+
+ def __init__(self, data):
+ rend.Page.__init__(self, data)
+ self.update_status = data
+
+ def render_started(self, ctx, data):
+ TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
+ started_s = time.strftime(TIME_FORMAT,
+ time.localtime(data.get_started()))
+ return started_s
+
+ def render_si(self, ctx, data):
+ si_s = base32.b2a_or_none(data.get_storage_index())
+ if si_s is None:
+ si_s = "(None)"
+ return si_s
+
+ def render_helper(self, ctx, data):
+ return {True: "Yes",
+ False: "No"}[data.using_helper()]
+
+ def render_progress(self, ctx, data):
+ progress = data.get_progress()
+ # TODO: make an ascii-art bar
+ return "%.1f%%" % (100.0 * progress)
+
+ def render_status(self, ctx, data):
+ return data.get_status()
+
+ def render_problems(self, ctx, data):
+ problems = data.problems
+ if not problems:
+ return ""
+ l = T.ul()
+ for peerid in sorted(problems.keys()):
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]]
+ return ctx.tag["Server Problems:", l]
+
+ def render_privkey_from(self, ctx, data):
+ peerid = data.get_privkey_from()
+ if peerid:
+ return ctx.tag["Got privkey from: [%s]"
+ % idlib.shortnodeid_b2a(peerid)]
+ else:
+ return ""
+
+ def data_time_total(self, ctx, data):
+ return self.update_status.timings.get("total")
+
+ def data_time_setup(self, ctx, data):
+ return self.update_status.timings.get("setup")
+
+ def data_time_cumulative_verify(self, ctx, data):
+ return self.update_status.timings.get("cumulative_verify")
+
+ def render_server_timings(self, ctx, data):
+ per_server = self.update_status.timings.get("per_server")
+ if not per_server:
+ return ""
+ l = T.ul()
+ for peerid in sorted(per_server.keys()):
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ times = []
+ for op,t in per_server[peerid]:
+ if op == "query":
+ times.append( self.render_time(None, t) )
+ else:
+ times.append( "(" + self.render_time(None, t) + ")" )
+ times_s = ", ".join(times)
+ l[T.li["[%s]: %s" % (peerid_s, times_s)]]
+ return T.li["Per-Server Response Times: ", l]
+
class Status(rend.Page):
docFactory = getxmlfile("status.xhtml")
client = IClient(ctx)
active = (client.list_active_uploads() +
client.list_active_downloads() +
+ client.list_active_mapupdate() +
client.list_active_publish() +
client.list_active_retrieve() +
client.list_active_helper_statuses())
client = IClient(ctx)
recent = [o for o in (client.list_recent_uploads() +
client.list_recent_downloads() +
+ client.list_recent_mapupdate() +
client.list_recent_publish() +
client.list_recent_retrieve() +
client.list_recent_helper_statuses())
link = "publish-%d" % data.get_counter()
ctx.fillSlots("type", "publish")
ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
- else:
- assert IRetrieveStatus.providedBy(data)
+ elif IRetrieveStatus.providedBy(data):
ctx.fillSlots("type", "retrieve")
link = "retrieve-%d" % data.get_counter()
ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
+ else:
+ assert IServermapUpdaterStatus.providedBy(data)
+ ctx.fillSlots("type", "mapupdate %s" % data.get_mode())
+ link = "mapupdate-%d" % data.get_counter()
+ ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
ctx.fillSlots("status", T.a(href=link)[s.get_status()])
return ctx.tag
s = d.get_download_status()
if s.get_counter() == count:
return DownloadStatusPage(s)
+ if stype == "mapupdate":
+ for s in client.list_recent_mapupdate():
+ if s.get_counter() == count:
+ return MapupdateStatusPage(s)
+ for p in client.list_all_mapupdate():
+ s = p.get_status()
+ if s.get_counter() == count:
+ return MapupdateStatusPage(s)
if stype == "publish":
for s in client.list_recent_publish():
if s.get_counter() == count: