From: Brian Warner <warner@allmydata.com>
Date: Thu, 17 Apr 2008 00:49:06 +0000 (-0700)
Subject: mutable WIP: re-enable publish/retrieve status
X-Git-Tag: allmydata-tahoe-1.1.0~238
X-Git-Url: https://git.rkrishnan.org/pf/content/en/seg/class-simplejson.JSONDecoder-index.html?a=commitdiff_plain;h=749c42fa2c7a0648983351a313be4648086698e4;p=tahoe-lafs%2Ftahoe-lafs.git

mutable WIP: re-enable publish/retrieve status
---

diff --git a/src/allmydata/client.py b/src/allmydata/client.py
index 134ed9fd..2cd5d484 100644
--- a/src/allmydata/client.py
+++ b/src/allmydata/client.py
@@ -300,10 +300,10 @@ class Client(node.Node, testutil.PollMixin):
         assert IMutableFileURI.providedBy(u), u
         return MutableFileNode(self).init_from_uri(u)
 
-    def notify_publish(self, p):
-        self.getServiceNamed("mutable-watcher").notify_publish(p)
-    def notify_retrieve(self, r):
-        self.getServiceNamed("mutable-watcher").notify_retrieve(r)
+    def notify_publish(self, publish_status):
+        self.getServiceNamed("mutable-watcher").notify_publish(publish_status)
+    def notify_retrieve(self, retrieve_status):
+        self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
 
     def create_empty_dirnode(self):
         n = NewDirectoryNode(self)
diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py
index 52389297..c19e0396 100644
--- a/src/allmydata/mutable/node.py
+++ b/src/allmydata/mutable/node.py
@@ -22,8 +22,6 @@ from retrieve import Retrieve
 
 class MutableFileNode:
     implements(IMutableFileNode)
-    publish_class = Publish
-    retrieve_class = Retrieve
     SIGNATURE_KEY_SIZE = 2048
     DEFAULT_ENCODING = (3, 10)
 
@@ -90,7 +88,7 @@ class MutableFileNode:
             # nobody knows about us yet"
             self._current_seqnum = 0
             self._current_roothash = "\x00"*32
-            return self._publish(initial_contents)
+            return self._publish(None, initial_contents)
         d.addCallback(_generated)
         return d
 
@@ -225,17 +223,13 @@ class MutableFileNode:
     def download_version(self, servermap, versionid):
         """Returns a Deferred that fires with a string."""
         d = self.obtain_lock()
-        d.addCallback(lambda res:
-                      Retrieve(self, servermap, versionid).download())
+        d.addCallback(lambda res: self._retrieve(servermap, versionid))
         d.addBoth(self.release_lock)
         return d
 
-    def publish(self, servermap, newdata):
-        assert self._pubkey, "update_servermap must be called before publish"
+    def publish(self, servermap, new_contents):
         d = self.obtain_lock()
-        d.addCallback(lambda res: Publish(self, servermap).publish(newdata))
-        # p = self.publish_class(self)
-        # self._client.notify_publish(p)
+        d.addCallback(lambda res: self._publish(servermap, new_contents))
         d.addBoth(self.release_lock)
         return d
 
@@ -269,16 +263,11 @@ class MutableFileNode:
         verifier = self.get_verifier()
         return self._client.getServiceNamed("checker").check(verifier)
 
-    def download(self, target):
-        # fake it. TODO: make this cleaner.
-        d = self.download_to_data()
-        def _done(data):
-            target.open(len(data))
-            target.write(data)
-            target.close()
-            return target.finish()
-        d.addCallback(_done)
-        return d
+
+    def _retrieve(self, servermap, verinfo):
+        r = Retrieve(self, servermap, verinfo)
+        self._client.notify_retrieve(r.get_status())
+        return r.download()
 
     def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ):
         d = self.update_servermap(old_map=old_map, mode=mode)
@@ -306,22 +295,33 @@ class MutableFileNode:
         d.addBoth(self.release_lock)
         return d
 
-    def _publish(self, initial_contents):
-        p = Publish(self, None)
-        d = p.publish(initial_contents)
-        d.addCallback(lambda res: self)
+    def download(self, target):
+        # fake it. TODO: make this cleaner.
+        d = self.download_to_data()
+        def _done(data):
+            target.open(len(data))
+            target.write(data)
+            target.close()
+            return target.finish()
+        d.addCallback(_done)
         return d
 
-    def update(self, newdata):
+
+    def _publish(self, servermap, new_contents):
+        assert self._pubkey, "update_servermap must be called before publish"
+        p = Publish(self, servermap)
+        self._client.notify_publish(p.get_status())
+        return p.publish(new_contents)
+
+    def update(self, new_contents):
         d = self.obtain_lock()
         d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE))
-        d.addCallback(lambda smap:
-                      Publish(self, smap).publish(newdata))
+        d.addCallback(self._publish, new_contents)
         d.addBoth(self.release_lock)
         return d
 
-    def overwrite(self, newdata):
-        return self.update(newdata)
+    def overwrite(self, new_contents):
+        return self.update(new_contents)
 
 
 class MutableWatcher(service.MultiService):
@@ -332,42 +332,40 @@ class MutableWatcher(service.MultiService):
     def __init__(self, stats_provider=None):
         service.MultiService.__init__(self)
         self.stats_provider = stats_provider
-        self._all_publish = weakref.WeakKeyDictionary()
+        self._all_publish_status = weakref.WeakKeyDictionary()
         self._recent_publish_status = []
-        self._all_retrieve = weakref.WeakKeyDictionary()
+        self._all_retrieve_status = weakref.WeakKeyDictionary()
         self._recent_retrieve_status = []
 
     def notify_publish(self, p):
-        self._all_publish[p] = None
-        self._recent_publish_status.append(p.get_status())
+        self._all_publish_status[p] = None
+        self._recent_publish_status.append(p)
         if self.stats_provider:
             self.stats_provider.count('mutable.files_published', 1)
-            #self.stats_provider.count('mutable.bytes_published', p._node.get_size())
+            self.stats_provider.count('mutable.bytes_published', p.get_size())
         while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
             self._recent_publish_status.pop(0)
 
     def list_all_publish(self):
-        return self._all_publish.keys()
+        return self._all_publish_status.keys()
     def list_active_publish(self):
-        return [p.get_status() for p in self._all_publish.keys()
-                if p.get_status().get_active()]
+        return [p for p in self._all_publish_status.keys() if p.get_active()]
     def list_recent_publish(self):
         return self._recent_publish_status
 
 
     def notify_retrieve(self, r):
-        self._all_retrieve[r] = None
-        self._recent_retrieve_status.append(r.get_status())
+        self._all_retrieve_status[r] = None
+        self._recent_retrieve_status.append(r)
         if self.stats_provider:
             self.stats_provider.count('mutable.files_retrieved', 1)
-            #self.stats_provider.count('mutable.bytes_retrieved', r._node.get_size())
+            self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
         while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
             self._recent_retrieve_status.pop(0)
 
     def list_all_retrieve(self):
-        return self._all_retrieve.keys()
+        return self._all_retrieve_status.keys()
     def list_active_retrieve(self):
-        return [p.get_status() for p in self._all_retrieve.keys()
-                if p.get_status().get_active()]
+        return [p for p in self._all_retrieve_status.keys() if p.get_active()]
     def list_recent_retrieve(self):
         return self._recent_retrieve_status
diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py
index b53c639a..acbe5cee 100644
--- a/src/allmydata/mutable/publish.py
+++ b/src/allmydata/mutable/publish.py
@@ -4,10 +4,12 @@ import os, struct, time
 from itertools import count
 from zope.interface import implements
 from twisted.internet import defer
+from twisted.python import failure
 from allmydata.interfaces import IPublishStatus
 from allmydata.util import base32, hashutil, mathutil, idlib, log
 from allmydata import hashtree, codec, storage
 from pycryptopp.cipher.aes import AES
+from foolscap.eventual import eventually
 
 from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
 from servermap import ServerMap
@@ -19,27 +21,23 @@ class PublishStatus:
     statusid_counter = count(0)
     def __init__(self):
         self.timings = {}
-        self.timings["per_server"] = {}
-        self.privkey_from = None
-        self.peers_queried = None
-        self.sharemap = None # DictOfSets
+        self.timings["send_per_server"] = {}
+        self.servermap = None
         self.problems = {}
         self.active = True
         self.storage_index = None
         self.helper = False
         self.encoding = ("?", "?")
-        self.initial_read_size = None
         self.size = None
         self.status = "Not started"
         self.progress = 0.0
         self.counter = self.statusid_counter.next()
         self.started = time.time()
 
-    def add_per_server_time(self, peerid, op, elapsed):
-        assert op in ("read", "write")
-        if peerid not in self.timings["per_server"]:
-            self.timings["per_server"][peerid] = []
-        self.timings["per_server"][peerid].append((op,elapsed))
+    def add_per_server_time(self, peerid, elapsed):
+        if peerid not in self.timings["send_per_server"]:
+            self.timings["send_per_server"][peerid] = []
+        self.timings["send_per_server"][peerid].append(elapsed)
 
     def get_started(self):
         return self.started
@@ -49,6 +47,8 @@ class PublishStatus:
         return self.encoding
     def using_helper(self):
         return self.helper
+    def get_servermap(self):
+        return self.servermap
     def get_size(self):
         return self.size
     def get_status(self):
@@ -64,6 +64,8 @@ class PublishStatus:
         self.storage_index = si
     def set_helper(self, helper):
         self.helper = helper
+    def set_servermap(self, servermap):
+        self.servermap = servermap
     def set_encoding(self, k, n):
         self.encoding = (k, n)
     def set_size(self, size):
@@ -102,6 +104,13 @@ class Publish:
         self._log_number = num
         self._running = True
 
+        self._status = PublishStatus()
+        self._status.set_storage_index(self._storage_index)
+        self._status.set_helper(False)
+        self._status.set_progress(0.0)
+        self._status.set_active(True)
+        self._status.set_servermap(servermap)
+
     def log(self, *args, **kwargs):
         if 'parent' not in kwargs:
             kwargs['parent'] = self._log_number
@@ -129,6 +138,9 @@ class Publish:
         # 5: when enough responses are back, we're done
 
         self.log("starting publish, datalen is %s" % len(newdata))
+        self._status.set_size(len(newdata))
+        self._status.set_status("Started")
+        self._started = time.time()
 
         self.done_deferred = defer.Deferred()
 
@@ -160,6 +172,8 @@ class Publish:
         assert self.required_shares is not None
         self.total_shares = self._node.get_total_shares()
         assert self.total_shares is not None
+        self._status.set_encoding(self.required_shares, self.total_shares)
+
         self._pubkey = self._node.get_pubkey()
         assert self._pubkey
         self._privkey = self._node.get_privkey()
@@ -209,8 +223,13 @@ class Publish:
         # create the shares. We'll discard these as they are delivered. SMDF:
         # we're allowed to hold everything in memory.
 
+        self._status.timings["setup"] = time.time() - self._started
         d = self._encrypt_and_encode()
         d.addCallback(self._generate_shares)
+        def _start_pushing(res):
+            self._started_pushing = time.time()
+            return res
+        d.addCallback(_start_pushing)
         d.addCallback(self.loop) # trigger delivery
         d.addErrback(self._fatal_error)
 
@@ -233,11 +252,22 @@ class Publish:
         self.log("error during loop", failure=f, level=log.SCARY)
         self._done(f)
 
+    def _update_status(self):
+        self._status.set_status("Sending Shares: %d placed out of %d, "
+                                "%d messages outstanding" %
+                                (len(self.placed),
+                                 len(self.goal),
+                                 len(self.outstanding)))
+        self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
+
     def loop(self, ignored=None):
         self.log("entering loop", level=log.NOISY)
+        if not self._running:
+            return
         self.update_goal()
         # how far are we from our goal?
         needed = self.goal - self.placed - self.outstanding
+        self._update_status()
 
         if needed:
             # we need to send out new shares
@@ -258,6 +288,9 @@ class Publish:
         # no queries outstanding, no placements needed: we're done
         self.log("no queries outstanding, no placements needed: done",
                  level=log.OPERATIONAL)
+        now = time.time()
+        elapsed = now - self._started_pushing
+        self._status.timings["push"] = elapsed
         return self._done(None)
 
     def log_goal(self, goal):
@@ -331,19 +364,21 @@ class Publish:
         # shares that we care about.
         self.log("_encrypt_and_encode")
 
-        #started = time.time()
+        self._status.set_status("Encrypting")
+        started = time.time()
 
         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
         enc = AES(key)
         crypttext = enc.process(self.newdata)
         assert len(crypttext) == len(self.newdata)
 
-        #now = time.time()
-        #self._status.timings["encrypt"] = now - started
-        #started = now
+        now = time.time()
+        self._status.timings["encrypt"] = now - started
+        started = now
 
         # now apply FEC
 
+        self._status.set_status("Encoding")
         fec = codec.CRSEncoder()
         fec.set_params(self.segment_size,
                        self.required_shares, self.total_shares)
@@ -358,8 +393,8 @@ class Publish:
 
         d = fec.encode(crypttext_pieces)
         def _done_encoding(res):
-            #elapsed = time.time() - started
-            #self._status.timings["encode"] = elapsed
+            elapsed = time.time() - started
+            self._status.timings["encode"] = elapsed
             return res
         d.addCallback(_done_encoding)
         return d
@@ -367,7 +402,8 @@ class Publish:
     def _generate_shares(self, shares_and_shareids):
         # this sets self.shares and self.root_hash
         self.log("_generate_shares")
-        #started = time.time()
+        self._status.set_status("Generating Shares")
+        started = time.time()
 
         # we should know these by now
         privkey = self._privkey
@@ -413,9 +449,9 @@ class Publish:
         # then they all share the same encprivkey at the end. The sizes
         # of everything are the same for all shares.
 
-        #sign_started = time.time()
+        sign_started = time.time()
         signature = privkey.sign(prefix)
-        #self._status.timings["sign"] = time.time() - sign_started
+        self._status.timings["sign"] = time.time() - sign_started
 
         verification_key = pubkey.serialize()
 
@@ -429,8 +465,8 @@ class Publish:
                                      all_shares[shnum],
                                      encprivkey)
             final_shares[shnum] = final_share
-        #elapsed = time.time() - started
-        #self._status.timings["pack"] = elapsed
+        elapsed = time.time() - started
+        self._status.timings["pack"] = elapsed
         self.shares = final_shares
         self.root_hash = root_hash
 
@@ -449,7 +485,6 @@ class Publish:
 
     def _send_shares(self, needed):
         self.log("_send_shares")
-        #started = time.time()
 
         # we're finally ready to send out our shares. If we encounter any
         # surprises here, it's because somebody else is writing at the same
@@ -547,6 +582,7 @@ class Publish:
             d.addErrback(self._fatal_error)
             dl.append(d)
 
+        self._update_status()
         return defer.DeferredList(dl) # purely for testing
 
     def _do_testreadwrite(self, peerid, secrets,
@@ -568,6 +604,10 @@ class Publish:
         for shnum in shnums:
             self.outstanding.discard( (peerid, shnum) )
 
+        now = time.time()
+        elapsed = now - started
+        self._status.add_per_server_time(peerid, elapsed)
+
         wrote, read_data = answer
 
         if not wrote:
@@ -650,13 +690,16 @@ class Publish:
         if not self._running:
             return
         self._running = False
-        #now = time.time()
-        #self._status.timings["total"] = now - self._started
-        #self._status.set_active(False)
-        #self._status.set_status("Done")
-        #self._status.set_progress(1.0)
-        self.done_deferred.callback(res)
-        return None
+        now = time.time()
+        self._status.timings["total"] = now - self._started
+        self._status.set_active(False)
+        if isinstance(res, failure.Failure):
+            self.log("Retrieve done, with failure", failure=res)
+            self._status.set_status("Failed")
+        else:
+            self._status.set_status("Done")
+            self._status.set_progress(1.0)
+        eventually(self.done_deferred.callback, res)
 
     def get_status(self):
         return self._status
diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py
index 4281ce90..00663f4c 100644
--- a/src/allmydata/mutable/retrieve.py
+++ b/src/allmydata/mutable/retrieve.py
@@ -21,13 +21,11 @@ class RetrieveStatus:
         self.timings = {}
         self.timings["fetch_per_server"] = {}
         self.timings["cumulative_verify"] = 0.0
-        self.sharemap = {}
         self.problems = {}
         self.active = True
         self.storage_index = None
         self.helper = False
         self.encoding = ("?","?")
-        self.search_distance = None
         self.size = None
         self.status = "Not started"
         self.progress = 0.0
@@ -40,8 +38,6 @@ class RetrieveStatus:
         return self.storage_index
     def get_encoding(self):
         return self.encoding
-    def get_search_distance(self):
-        return self.search_distance
     def using_helper(self):
         return self.helper
     def get_size(self):
@@ -55,14 +51,16 @@ class RetrieveStatus:
     def get_counter(self):
         return self.counter
 
+    def add_fetch_timing(self, peerid, elapsed):
+        if peerid not in self.timings["fetch_per_server"]:
+            self.timings["fetch_per_server"][peerid] = []
+        self.timings["fetch_per_server"][peerid].append(elapsed)
     def set_storage_index(self, si):
         self.storage_index = si
     def set_helper(self, helper):
         self.helper = helper
     def set_encoding(self, k, n):
         self.encoding = (k, n)
-    def set_search_distance(self, value):
-        self.search_distance = value
     def set_size(self, size):
         self.size = size
     def set_status(self, status):
@@ -99,6 +97,19 @@ class Retrieve:
         assert self._node._pubkey
         self.verinfo = verinfo
 
+        self._status = RetrieveStatus()
+        self._status.set_storage_index(self._storage_index)
+        self._status.set_helper(False)
+        self._status.set_progress(0.0)
+        self._status.set_active(True)
+        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+         offsets_tuple) = self.verinfo
+        self._status.set_size(datalength)
+        self._status.set_encoding(k, N)
+
+    def get_status(self):
+        return self._status
+
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
             kwargs["parent"] = self._log_number
@@ -106,6 +117,8 @@ class Retrieve:
 
     def download(self):
         self._done_deferred = defer.Deferred()
+        self._started = time.time()
+        self._status.set_status("Retrieving Shares")
 
         # first, which servers can we use?
         versionmap = self.servermap.make_versionmap()
@@ -165,6 +178,7 @@ class Retrieve:
         self._outstanding_queries[m] = (peerid, shnum, started)
 
         # ask the cache first
+        got_from_cache = False
         datav = []
         #for (offset, length) in readv:
         #    (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
@@ -173,13 +187,14 @@ class Retrieve:
         #        datav.append(data)
         if len(datav) == len(readv):
             self.log("got data from cache")
+            got_from_cache = True
             d = defer.succeed(datav)
         else:
             self.remaining_sharemap[shnum].remove(peerid)
             d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
             d.addCallback(self._fill_cache, readv)
 
-        d.addCallback(self._got_results, m, peerid, started)
+        d.addCallback(self._got_results, m, peerid, started, got_from_cache)
         d.addErrback(self._query_failed, m, peerid)
         # errors that aren't handled by _query_failed (and errors caused by
         # _query_failed) get logged, but we still want to check for doneness.
@@ -216,7 +231,11 @@ class Retrieve:
         for shnum in list(self.remaining_sharemap.keys()):
             self.remaining_sharemap.discard(shnum, peerid)
 
-    def _got_results(self, datavs, marker, peerid, started):
+    def _got_results(self, datavs, marker, peerid, started, got_from_cache):
+        now = time.time()
+        elapsed = now - started
+        if not got_from_cache:
+            self._status.add_fetch_timing(peerid, elapsed)
         self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
                  shares=len(datavs),
                  peerid=idlib.shortnodeid_b2a(peerid),
@@ -241,6 +260,7 @@ class Retrieve:
                 self.remove_peer(peerid)
                 self.servermap.mark_bad_share(peerid, shnum)
                 self._bad_shares.add( (peerid, shnum) )
+                self._status.problems[peerid] = f
                 self._last_failure = f
                 pass
         # all done!
@@ -284,6 +304,7 @@ class Retrieve:
         self.log(format="query to [%(peerid)s] failed",
                  peerid=idlib.shortnodeid_b2a(peerid),
                  level=log.NOISY)
+        self._status.problems[peerid] = f
         self._outstanding_queries.pop(marker, None)
         if not self._running:
             return
@@ -317,6 +338,10 @@ class Retrieve:
         # to fix it, so the download will fail.
 
         self._decoding = True # avoid reentrancy
+        self._status.set_status("decoding")
+        now = time.time()
+        elapsed = now - self._started
+        self._status.timings["fetch"] = elapsed
 
         d = defer.maybeDeferred(self._decode)
         d.addCallback(self._decrypt, IV, self._node._readkey)
@@ -366,6 +391,7 @@ class Retrieve:
             peerid = list(self.remaining_sharemap[shnum])[0]
             # get_data will remove that peerid from the sharemap, and add the
             # query to self._outstanding_queries
+            self._status.set_status("Retrieving More Shares")
             self.get_data(shnum, peerid)
             needed -= 1
             if not needed:
@@ -400,6 +426,7 @@ class Retrieve:
         return
 
     def _decode(self):
+        started = time.time()
         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
          offsets_tuple) = self.verinfo
 
@@ -423,6 +450,7 @@ class Retrieve:
         self.log("about to decode, shareids=%s" % (shareids,))
         d = defer.maybeDeferred(fec.decode, shares, shareids)
         def _done(buffers):
+            self._status.timings["decode"] = time.time() - started
             self.log(" decode done, %d buffers" % len(buffers))
             segment = "".join(buffers)
             self.log(" joined length %d, datalength %d" %
@@ -438,21 +466,28 @@ class Retrieve:
         return d
 
     def _decrypt(self, crypttext, IV, readkey):
+        self._status.set_status("decrypting")
         started = time.time()
         key = hashutil.ssk_readkey_data_hash(IV, readkey)
         decryptor = AES(key)
         plaintext = decryptor.process(crypttext)
+        self._status.timings["decrypt"] = time.time() - started
         return plaintext
 
     def _done(self, res):
         if not self._running:
             return
         self._running = False
+        self._status.set_active(False)
+        self._status.timings["total"] = time.time() - self._started
         # res is either the new contents, or a Failure
         if isinstance(res, failure.Failure):
             self.log("Retrieve done, with failure", failure=res)
+            self._status.set_status("Failed")
         else:
             self.log("Retrieve done, success!")
+            self._status.set_status("Done")
+            self._status.set_progress(1.0)
             # remember the encoding parameters, use them again next time
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = self.verinfo
diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py
index ccfb50d3..edd2a8d6 100644
--- a/src/allmydata/mutable/servermap.py
+++ b/src/allmydata/mutable/servermap.py
@@ -85,6 +85,13 @@ class ServerMap:
                     for (peerid, shnum)
                     in self.servermap])
 
+    def make_sharemap(self):
+        """Return a dict that maps shnum to a set of peerds that hold it."""
+        sharemap = DictOfSets()
+        for (peerid, shnum) in self.servermap:
+            sharemap.add(shnum, peerid)
+        return sharemap
+
     def make_versionmap(self):
         """Return a dict that maps versionid to sets of (shnum, peerid,
         timestamp) tuples."""
diff --git a/src/allmydata/test/test_keygen.py b/src/allmydata/test/test_keygen.py
index 38c54d3f..a6f354d6 100644
--- a/src/allmydata/test/test_keygen.py
+++ b/src/allmydata/test/test_keygen.py
@@ -25,6 +25,7 @@ class KeyGenService(unittest.TestCase, testutil.PollMixin):
         t.setServiceParent(self.parent)
         t.listenOn("tcp:0")
         t.setLocationAutomatically()
+        return eventual.fireEventually()
 
     def tearDown(self):
         d = self.parent.stopService()
diff --git a/src/allmydata/web/publish-status.xhtml b/src/allmydata/web/publish-status.xhtml
index 9250371b..cb089ee2 100644
--- a/src/allmydata/web/publish-status.xhtml
+++ b/src/allmydata/web/publish-status.xhtml
@@ -22,7 +22,6 @@
 <h2>Retrieve Results</h2>
 <ul>
   <li n:render="encoding" />
-  <li n:render="peers_queried" />
   <li n:render="problems" />
   <li n:render="sharemap" />
   <li>Timings:</li>
@@ -31,12 +30,6 @@
     (<span n:render="rate" n:data="rate_total" />)</li>
     <ul>
       <li>Setup: <span n:render="time" n:data="time_setup" /></li>
-      <li>Initial Version Query: <span n:render="time" n:data="time_query" />
-      (read size <span n:render="string" n:data="initial_read_size"/> bytes)</li>
-      <li>Obtain Privkey: <span n:render="time" n:data="time_privkey" />
-      <ul>
-        <li>Separate Privkey Fetch: <span n:render="time" n:data="time_privkey_fetch" /> <span n:render="privkey_from"/></li>
-      </ul></li>
       <li>Encrypting: <span n:render="time" n:data="time_encrypt" />
       (<span n:render="rate" n:data="rate_encrypt" />)</li>
       <li>Encoding: <span n:render="time" n:data="time_encode" />
diff --git a/src/allmydata/web/retrieve-status.xhtml b/src/allmydata/web/retrieve-status.xhtml
index fe0b9e33..52a7b180 100644
--- a/src/allmydata/web/retrieve-status.xhtml
+++ b/src/allmydata/web/retrieve-status.xhtml
@@ -22,19 +22,14 @@
 <h2>Retrieve Results</h2>
 <ul>
   <li n:render="encoding" />
-  <li n:render="search_distance" />
   <li n:render="problems" />
   <li>Timings:</li>
   <ul>
     <li>Total: <span n:render="time" n:data="time_total" />
     (<span n:render="rate" n:data="rate_total" />)</li>
     <ul>
-      <li>Initial Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
       <li>Fetching: <span n:render="time" n:data="time_fetch" />
-      (<span n:render="rate" n:data="rate_fetch" />)
-      <ul>
-        <li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li>
-      </ul></li>
+      (<span n:render="rate" n:data="rate_fetch" />)</li>
       <li>Decoding: <span n:render="time" n:data="time_decode" />
       (<span n:render="rate" n:data="rate_decode" />)</li>
       <li>Decrypting: <span n:render="time" n:data="time_decrypt" />
diff --git a/src/allmydata/web/status.py b/src/allmydata/web/status.py
index 06e9f010..2e778883 100644
--- a/src/allmydata/web/status.py
+++ b/src/allmydata/web/status.py
@@ -449,10 +449,6 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
         k, n = data.get_encoding()
         return ctx.tag["Encoding: %s of %s" % (k, n)]
 
-    def render_search_distance(self, ctx, data):
-        d = data.get_search_distance()
-        return ctx.tag["Search Distance: %s peer%s" % (d, plural(d))]
-
     def render_problems(self, ctx, data):
         problems = data.problems
         if not problems:
@@ -553,19 +549,16 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
         k, n = data.get_encoding()
         return ctx.tag["Encoding: %s of %s" % (k, n)]
 
-    def render_peers_queried(self, ctx, data):
-        return ctx.tag["Peers Queried: ", data.peers_queried]
-
     def render_sharemap(self, ctx, data):
-        sharemap = data.sharemap
-        if sharemap is None:
+        servermap = data.get_servermap()
+        if servermap is None:
             return ctx.tag["None"]
         l = T.ul()
+        sharemap = servermap.make_sharemap()
         for shnum in sorted(sharemap.keys()):
             l[T.li["%d -> Placed on " % shnum,
                    ", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
-                              for (peerid,seqnum,root_hash)
-                              in sharemap[shnum]])]]
+                              for peerid in sharemap[shnum]])]]
         return ctx.tag["Sharemap:", l]
 
     def render_problems(self, ctx, data):
@@ -596,21 +589,6 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
     def data_time_setup(self, ctx, data):
         return self.publish_status.timings.get("setup")
 
-    def data_time_query(self, ctx, data):
-        return self.publish_status.timings.get("query")
-
-    def data_time_privkey(self, ctx, data):
-        return self.publish_status.timings.get("privkey")
-
-    def data_time_privkey_fetch(self, ctx, data):
-        return self.publish_status.timings.get("privkey_fetch")
-    def render_privkey_from(self, ctx, data):
-        peerid = data.privkey_from
-        if peerid:
-            return " (got from [%s])" % idlib.shortnodeid_b2a(peerid)
-        else:
-            return ""
-
     def data_time_encrypt(self, ctx, data):
         return self.publish_status.timings.get("encrypt")
     def data_rate_encrypt(self, ctx, data):
@@ -633,23 +611,15 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
     def data_rate_push(self, ctx, data):
         return self._get_rate(data, "push")
 
-    def data_initial_read_size(self, ctx, data):
-        return self.publish_status.initial_read_size
-
     def render_server_timings(self, ctx, data):
-        per_server = self.publish_status.timings.get("per_server")
+        per_server = self.publish_status.timings.get("send_per_server")
         if not per_server:
             return ""
         l = T.ul()
         for peerid in sorted(per_server.keys()):
             peerid_s = idlib.shortnodeid_b2a(peerid)
-            times = []
-            for op,t in per_server[peerid]:
-                if op == "read":
-                    times.append( "(" + self.render_time(None, t) + ")" )
-                else:
-                    times.append( self.render_time(None, t) )
-            times_s = ", ".join(times)
+            times_s = ", ".join([self.render_time(None, t)
+                                 for t in per_server[peerid]])
             l[T.li["[%s]: %s" % (peerid_s, times_s)]]
         return T.li["Per-Server Response Times: ", l]