]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/servermap.py
ServerMap.copy(): deepcopy .update_data too. Closes #1785.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
index 79a25dd8436b65f5db813442807f69709b66929b..149e1a259e054adc9c068f36ef917ba221d32590 100644 (file)
@@ -1,18 +1,20 @@
 
-import sys, time
+import sys, time, copy
 from zope.interface import implements
 from itertools import count
 from twisted.internet import defer
 from twisted.python import failure
-from foolscap.eventual import eventually
-from allmydata.util import base32, hashutil, idlib, log
-from allmydata import storage
+from foolscap.api import DeadReferenceError, RemoteException, eventually, \
+                         fireEventually
+from allmydata.util import base32, hashutil, log, deferredutil
+from allmydata.util.dictutil import DictOfSets
+from allmydata.storage.server import si_b2a
 from allmydata.interfaces import IServermapUpdaterStatus
 from pycryptopp.publickey import rsa
 
-from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
-     DictOfSets, CorruptShareError, NeedMoreDataError
-from layout import unpack_prefix_and_signature, unpack_header, unpack_share
+from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
+     MODE_READ, MODE_REPAIR, CorruptShareError
+from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
 
 class UpdateStatus:
     implements(IServermapUpdaterStatus)
@@ -32,11 +34,11 @@ class UpdateStatus:
         self.started = time.time()
         self.finished = None
 
-    def add_per_server_time(self, peerid, op, sent, elapsed):
+    def add_per_server_time(self, server, op, sent, elapsed):
         assert op in ("query", "late", "privkey")
-        if peerid not in self.timings["per_server"]:
-            self.timings["per_server"][peerid] = []
-        self.timings["per_server"][peerid].append((op,sent,elapsed))
+        if server not in self.timings["per_server"]:
+            self.timings["per_server"][server] = []
+        self.timings["per_server"][server].append((op,sent,elapsed))
 
     def get_started(self):
         return self.started
@@ -67,8 +69,8 @@ class UpdateStatus:
         self.storage_index = si
     def set_mode(self, mode):
         self.mode = mode
-    def set_privkey_from(self, peerid):
-        self.privkey_from = peerid
+    def set_privkey_from(self, server):
+        self.privkey_from = server
     def set_status(self, status):
         self.status = status
     def set_progress(self, value):
@@ -93,109 +95,159 @@ class ServerMap:
     has changed since I last retrieved this data'. This reduces the chances
     of clobbering a simultaneous (uncoordinated) write.
 
-    @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
-                     (versionid, timestamp) tuple. Each 'versionid' is a
-                     tuple of (seqnum, root_hash, IV, segsize, datalength,
-                     k, N, signed_prefix, offsets)
-
-    @ivar connections: maps peerid to a RemoteReference
-
-    @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
-                      shares that I should ignore (because a previous user of
-                      the servermap determined that they were invalid). The
-                      updater only locates a certain number of shares: if
-                      some of these turn out to have integrity problems and
-                      are unusable, the caller will need to mark those shares
-                      as bad, then re-update the servermap, then try again.
+    @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
+                        (versionid, timestamp) tuple. Each 'versionid' is a
+                        tuple of (seqnum, root_hash, IV, segsize, datalength,
+                        k, N, signed_prefix, offsets)
+
+    @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
+                       shares that I should ignore (because a previous user
+                       of the servermap determined that they were invalid).
+                       The updater only locates a certain number of shares:
+                       if some of these turn out to have integrity problems
+                       and are unusable, the caller will need to mark those
+                       shares as bad, then re-update the servermap, then try
+                       again. The dict maps (server, shnum) tuple to old
+                       checkstring.
     """
 
     def __init__(self):
-        self.servermap = {}
-        self.connections = {}
-        self.unreachable_peers = set() # peerids that didn't respond to queries
-        self.problems = [] # mostly for debugging
-        self.bad_shares = set()
-        self.last_update_mode = None
-        self.last_update_time = 0
-
-    def mark_bad_share(self, peerid, shnum):
-        """This share was found to be bad, not in the checkstring or
-        signature, but deeper in the share, detected at retrieve time. Remove
-        it from our list of useful shares, and remember that it is bad so we
-        don't add it back again later.
+        self._known_shares = {}
+        self.unreachable_servers = set() # servers that didn't respond to queries
+        self.reachable_servers = set() # servers that did respond to queries
+        self._problems = [] # mostly for debugging
+        self._bad_shares = {} # maps (server,shnum) to old checkstring
+        self._last_update_mode = None
+        self._last_update_time = 0
+        self.proxies = {}
+        self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
+        # where blockhashes is a list of bytestrings (the result of
+        # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
+        # (block,salt) tuple-of-bytestrings from get_block_and_salt()
+
+    def copy(self):
+        s = ServerMap()
+        s._known_shares = self._known_shares.copy() # tuple->tuple
+        s.unreachable_servers = set(self.unreachable_servers)
+        s.reachable_servers = set(self.reachable_servers)
+        s._problems = self._problems[:]
+        s._bad_shares = self._bad_shares.copy() # tuple->str
+        s._last_update_mode = self._last_update_mode
+        s._last_update_time = self._last_update_time
+        s.update_data = copy.deepcopy(self.update_data)
+        return s
+
+    def get_reachable_servers(self):
+        return self.reachable_servers
+
+    def mark_server_reachable(self, server):
+        self.reachable_servers.add(server)
+
+    def mark_server_unreachable(self, server):
+        self.unreachable_servers.add(server)
+
+    def mark_bad_share(self, server, shnum, checkstring):
+        """This share was found to be bad, either in the checkstring or
+        signature (detected during mapupdate), or deeper in the share
+        (detected at retrieve time). Remove it from our list of useful
+        shares, and remember that it is bad so we don't add it back again
+        later. We record the share's old checkstring (which might be
+        corrupted or badly signed) so that a repair operation can do the
+        test-and-set using it as a reference.
         """
-        key = (peerid, shnum)
-        self.bad_shares.add(key)
-        self.servermap.pop(key, None)
+        key = (server, shnum) # record checkstring
+        self._bad_shares[key] = checkstring
+        self._known_shares.pop(key, None)
+
+    def get_bad_shares(self):
+        # key=(server,shnum) -> checkstring
+        return self._bad_shares
 
-    def add_new_share(self, peerid, shnum, verinfo, timestamp):
+    def add_new_share(self, server, shnum, verinfo, timestamp):
         """We've written a new share out, replacing any that was there
         before."""
-        key = (peerid, shnum)
-        self.bad_shares.discard(key)
-        self.servermap[key] = (verinfo, timestamp)
+        key = (server, shnum)
+        self._bad_shares.pop(key, None)
+        self._known_shares[key] = (verinfo, timestamp)
+
+    def add_problem(self, f):
+        self._problems.append(f)
+    def get_problems(self):
+        return self._problems
+
+    def set_last_update(self, mode, when):
+        self._last_update_mode = mode
+        self._last_update_time = when
+    def get_last_update(self):
+        return (self._last_update_mode, self._last_update_time)
 
     def dump(self, out=sys.stdout):
         print >>out, "servermap:"
 
-        for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
+        for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
-                          (idlib.shortnodeid_b2a(peerid), shnum,
+                          (server.get_name(), shnum,
                            seqnum, base32.b2a(root_hash)[:4], k, N,
                            datalength))
-        if self.problems:
-            print >>out, "%d PROBLEMS" % len(self.problems)
-            for f in self.problems:
+        if self._problems:
+            print >>out, "%d PROBLEMS" % len(self._problems)
+            for f in self._problems:
                 print >>out, str(f)
         return out
 
-    def all_peers(self):
-        return set([peerid
-                    for (peerid, shnum)
-                    in self.servermap])
+    def all_servers(self):
+        return set([server for (server, shnum) in self._known_shares])
+
+    def all_servers_for_version(self, verinfo):
+        """Return a set of servers that hold shares for the given version."""
+        return set([server
+                    for ( (server, shnum), (verinfo2, timestamp) )
+                    in self._known_shares.items()
+                    if verinfo == verinfo2])
+
+    def get_known_shares(self):
+        # maps (server,shnum) to (versionid,timestamp)
+        return self._known_shares
 
     def make_sharemap(self):
-        """Return a dict that maps shnum to a set of peerds that hold it."""
+        """Return a dict that maps shnum to a set of servers that hold it."""
         sharemap = DictOfSets()
-        for (peerid, shnum) in self.servermap:
-            sharemap.add(shnum, peerid)
+        for (server, shnum) in self._known_shares:
+            sharemap.add(shnum, server)
         return sharemap
 
     def make_versionmap(self):
-        """Return a dict that maps versionid to sets of (shnum, peerid,
+        """Return a dict that maps versionid to sets of (shnum, server,
         timestamp) tuples."""
         versionmap = DictOfSets()
-        for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
-            versionmap.add(verinfo, (shnum, peerid, timestamp))
+        for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
+            versionmap.add(verinfo, (shnum, server, timestamp))
         return versionmap
 
-    def shares_on_peer(self, peerid):
-        return set([shnum
-                    for (s_peerid, shnum)
-                    in self.servermap
-                    if s_peerid == peerid])
+    def debug_shares_on_server(self, server): # used by tests
+        return set([shnum for (s, shnum) in self._known_shares if s == server])
 
-    def version_on_peer(self, peerid, shnum):
-        key = (peerid, shnum)
-        if key in self.servermap:
-            (verinfo, timestamp) = self.servermap[key]
+    def version_on_server(self, server, shnum):
+        key = (server, shnum)
+        if key in self._known_shares:
+            (verinfo, timestamp) = self._known_shares[key]
             return verinfo
         return None
 
     def shares_available(self):
         """Return a dict that maps verinfo to tuples of
-        (num_distinct_shares, k) tuples."""
+        (num_distinct_shares, k, N) tuples."""
         versionmap = self.make_versionmap()
         all_shares = {}
         for verinfo, shares in versionmap.items():
             s = set()
-            for (shnum, peerid, timestamp) in shares:
+            for (shnum, server, timestamp) in shares:
                 s.add(shnum)
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
-            all_shares[verinfo] = (len(s), k)
+            all_shares[verinfo] = (len(s), k, N)
         return all_shares
 
     def highest_seqnum(self):
@@ -205,28 +257,31 @@ class ServerMap:
         seqnums.append(0)
         return max(seqnums)
 
+    def summarize_version(self, verinfo):
+        """Take a versionid, return a string that describes it."""
+        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+         offsets_tuple) = verinfo
+        return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
+
     def summarize_versions(self):
         """Return a string describing which versions we know about."""
         versionmap = self.make_versionmap()
         bits = []
         for (verinfo, shares) in versionmap.items():
-            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
-             offsets_tuple) = verinfo
-            shnums = set([shnum for (shnum, peerid, timestamp) in shares])
-            bits.append("%d*seq%d-%s" %
-                        (len(shnums), seqnum, base32.b2a(root_hash)[:4]))
+            vstr = self.summarize_version(verinfo)
+            shnums = set([shnum for (shnum, server, timestamp) in shares])
+            bits.append("%d*%s" % (len(shnums), vstr))
         return "/".join(bits)
 
     def recoverable_versions(self):
         """Return a set of versionids, one for each version that is currently
         recoverable."""
         versionmap = self.make_versionmap()
-
         recoverable_versions = set()
         for (verinfo, shares) in versionmap.items():
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
-            shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+            shnums = set([shnum for (shnum, server, timestamp) in shares])
             if len(shnums) >= k:
                 # this one is recoverable
                 recoverable_versions.add(verinfo)
@@ -242,7 +297,7 @@ class ServerMap:
         for (verinfo, shares) in versionmap.items():
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
-            shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+            shnums = set([shnum for (shnum, server, timestamp) in shares])
             if len(shnums) < k:
                 unrecoverable_versions.add(verinfo)
 
@@ -258,6 +313,13 @@ class ServerMap:
             return recoverable[-1]
         return None
 
+    def size_of_version(self, verinfo):
+        """Given a versionid (perhaps returned by best_recoverable_version),
+        return the size of the file in bytes."""
+        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+         offsets_tuple) = verinfo
+        return datalength
+
     def unrecoverable_newer_versions(self):
         # Return a dict of versionid -> health, for versions that are
         # unrecoverable and have later seqnums than any recoverable versions.
@@ -269,7 +331,7 @@ class ServerMap:
         for (verinfo, shares) in versionmap.items():
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
-            shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+            shnums = set([shnum for (shnum, server, timestamp) in shares])
             healths[verinfo] = (len(shnums),k)
             if len(shnums) < k:
                 unrecoverable.add(verinfo)
@@ -292,19 +354,44 @@ class ServerMap:
         # same seqnum, meaning that MutableFileNode.read_best_version is not
         # giving you the whole story, and that using its data to do a
         # subsequent publish will lose information.
-        return bool(len(self.recoverable_versions()) > 1)
+        recoverable_seqnums = [verinfo[0]
+                               for verinfo in self.recoverable_versions()]
+        for seqnum in recoverable_seqnums:
+            if recoverable_seqnums.count(seqnum) > 1:
+                return True
+        return False
+
+
+    def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
+        """
+        I return the update data for the given shnum
+        """
+        update_data = self.update_data[shnum]
+        update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
+        return update_datum
+
+
+    def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
+        """
+        I record the block hash tree for the given shnum.
+        """
+        self.update_data.setdefault(shnum , []).append((verinfo, data))
 
 
 class ServermapUpdater:
-    def __init__(self, filenode, servermap, mode=MODE_READ):
+    def __init__(self, filenode, storage_broker, monitor, servermap,
+                 mode=MODE_READ, add_lease=False, update_range=None):
         """I update a servermap, locating a sufficient number of useful
         shares and remembering where they are located.
 
         """
 
         self._node = filenode
+        self._storage_broker = storage_broker
+        self._monitor = monitor
         self._servermap = servermap
         self.mode = mode
+        self._add_lease = add_lease
         self._running = True
 
         self._storage_index = filenode.get_storage_index()
@@ -315,7 +402,10 @@ class ServermapUpdater:
         self._status.set_progress(0.0)
         self._status.set_mode(mode)
 
+        self._servers_responded = set()
+
         # how much data should we read?
+        # SDMF:
         #  * if we only need the checkstring, then [0:75]
         #  * if we need to validate the checkstring sig, then [543ish:799ish]
         #  * if we need the verification key, then [107:436ish]
@@ -323,19 +413,39 @@ class ServermapUpdater:
         #  * if we need the encrypted private key, we want [-1216ish:]
         #   * but we can't read from negative offsets
         #   * the offset table tells us the 'ish', also the positive offset
-        # A future version of the SMDF slot format should consider using
-        # fixed-size slots so we can retrieve less data. For now, we'll just
-        # read 2000 bytes, which also happens to read enough actual data to
-        # pre-fetch a 9-entry dirnode.
-        self._read_size = 2000
+        # MDMF:
+        #  * Checkstring? [0:72]
+        #  * If we want to validate the checkstring, then [0:72], [143:?] --
+        #    the offset table will tell us for sure.
+        #  * If we need the verification key, we have to consult the offset
+        #    table as well.
+        # At this point, we don't know which we are. Our filenode can
+        # tell us, but it might be lying -- in some cases, we're
+        # responsible for telling it which kind of file it is.
+        self._read_size = 4000
         if mode == MODE_CHECK:
             # we use unpack_prefix_and_signature, so we need 1k
             self._read_size = 1000
         self._need_privkey = False
-        if mode == MODE_WRITE and not self._node._privkey:
-            self._need_privkey = True
 
-        prefix = storage.si_b2a(self._storage_index)[:5]
+        if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
+            self._need_privkey = True
+        # check+repair: repair requires the privkey, so if we didn't happen
+        # to ask for it during the check, we'll have problems doing the
+        # publish.
+
+        self.fetch_update_data = False
+        if mode == MODE_WRITE and update_range:
+            # We're updating the servermap in preparation for an
+            # in-place file update, so we need to fetch some additional
+            # data from each share that we find.
+            assert len(update_range) == 2
+
+            self.start_segment = update_range[0]
+            self.end_segment = update_range[1]
+            self.fetch_update_data = True
+
+        prefix = si_b2a(self._storage_index)[:5]
         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
                                    si=prefix, mode=mode)
 
@@ -345,6 +455,8 @@ class ServermapUpdater:
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
             kwargs["parent"] = self._log_number
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.mutable.mapupdate"
         return log.msg(*args, **kwargs)
 
     def update(self):
@@ -358,240 +470,392 @@ class ServermapUpdater:
         # avoid re-checking the signatures for each share.
         self._valid_versions = set()
 
-        # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
-        # timestamp) tuples. This is used to figure out which versions might
-        # be retrievable, and to make the eventual data download faster.
-        self.versionmap = DictOfSets()
-
         self._done_deferred = defer.Deferred()
 
-        # first, which peers should be talk to? Any that were in our old
+        # first, which servers should be talk to? Any that were in our old
         # servermap, plus "enough" others.
 
         self._queries_completed = 0
 
-        client = self._node._client
-        full_peerlist = client.get_permuted_peers("storage",
-                                                  self._node._storage_index)
-        self.full_peerlist = full_peerlist # for use later, immutable
-        self.extra_peers = full_peerlist[:] # peers are removed as we use them
-        self._good_peers = set() # peers who had some shares
-        self._empty_peers = set() # peers who don't have any shares
-        self._bad_peers = set() # peers to whom our queries failed
+        sb = self._storage_broker
+        # All of the servers, permuted by the storage index, as usual.
+        full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
+        self.full_serverlist = full_serverlist # for use later, immutable
+        self.extra_servers = full_serverlist[:] # servers are removed as we use them
+        self._good_servers = set() # servers who had some shares
+        self._empty_servers = set() # servers who don't have any shares
+        self._bad_servers = set() # servers to whom our queries failed
 
         k = self._node.get_required_shares()
+        # For what cases can these conditions work?
         if k is None:
             # make a guess
             k = 3
-        N = self._node.get_required_shares()
+        N = self._node.get_total_shares()
         if N is None:
             N = 10
         self.EPSILON = k
-        # we want to send queries to at least this many peers (although we
+        # we want to send queries to at least this many servers (although we
         # might not wait for all of their answers to come back)
-        self.num_peers_to_query = k + self.EPSILON
+        self.num_servers_to_query = k + self.EPSILON
 
-        if self.mode == MODE_CHECK:
-            initial_peers_to_query = dict(full_peerlist)
-            must_query = set(initial_peers_to_query.keys())
-            self.extra_peers = []
+        if self.mode in (MODE_CHECK, MODE_REPAIR):
+            # We want to query all of the servers.
+            initial_servers_to_query = list(full_serverlist)
+            must_query = set(initial_servers_to_query)
+            self.extra_servers = []
         elif self.mode == MODE_WRITE:
             # we're planning to replace all the shares, so we want a good
             # chance of finding them all. We will keep searching until we've
             # seen epsilon that don't have a share.
-            self.num_peers_to_query = N + self.EPSILON
-            initial_peers_to_query, must_query = self._build_initial_querylist()
-            self.required_num_empty_peers = self.EPSILON
+            # We don't query all of the servers because that could take a while.
+            self.num_servers_to_query = N + self.EPSILON
+            initial_servers_to_query, must_query = self._build_initial_querylist()
+            self.required_num_empty_servers = self.EPSILON
 
             # TODO: arrange to read lots of data from k-ish servers, to avoid
             # the extra round trip required to read large directories. This
             # might also avoid the round trip required to read the encrypted
             # private key.
 
-        else:
-            initial_peers_to_query, must_query = self._build_initial_querylist()
-
-        # this is a set of peers that we are required to get responses from:
-        # they are peers who used to have a share, so we need to know where
-        # they currently stand, even if that means we have to wait for a
-        # silently-lost TCP connection to time out. We remove peers from this
-        # set as we get responses.
-        self._must_query = must_query
-
-        # now initial_peers_to_query contains the peers that we should ask,
-        # self.must_query contains the peers that we must have heard from
-        # before we can consider ourselves finished, and self.extra_peers
-        # contains the overflow (peers that we should tap if we don't get
-        # enough responses)
-
-        self._send_initial_requests(initial_peers_to_query)
+        else: # MODE_READ, MODE_ANYTHING
+            # 2*k servers is good enough.
+            initial_servers_to_query, must_query = self._build_initial_querylist()
+
+        # this is a set of servers that we are required to get responses
+        # from: they are servers who used to have a share, so we need to know
+        # where they currently stand, even if that means we have to wait for
+        # a silently-lost TCP connection to time out. We remove servers from
+        # this set as we get responses.
+        self._must_query = set(must_query)
+
+        # now initial_servers_to_query contains the servers that we should
+        # ask, self.must_query contains the servers that we must have heard
+        # from before we can consider ourselves finished, and
+        # self.extra_servers contains the overflow (servers that we should
+        # tap if we don't get enough responses)
+        # I guess that self._must_query is a subset of
+        # initial_servers_to_query?
+        assert must_query.issubset(initial_servers_to_query)
+
+        self._send_initial_requests(initial_servers_to_query)
         self._status.timings["initial_queries"] = time.time() - self._started
         return self._done_deferred
 
     def _build_initial_querylist(self):
-        initial_peers_to_query = {}
-        must_query = set()
-        for peerid in self._servermap.all_peers():
-            ss = self._servermap.connections[peerid]
-            # we send queries to everyone who was already in the sharemap
-            initial_peers_to_query[peerid] = ss
-            # and we must wait for responses from them
-            must_query.add(peerid)
-
-        while ((self.num_peers_to_query > len(initial_peers_to_query))
-               and self.extra_peers):
-            (peerid, ss) = self.extra_peers.pop(0)
-            initial_peers_to_query[peerid] = ss
-
-        return initial_peers_to_query, must_query
-
-    def _send_initial_requests(self, peerlist):
-        self._status.set_status("Sending %d initial queries" % len(peerlist))
+        # we send queries to everyone who was already in the sharemap
+        initial_servers_to_query = set(self._servermap.all_servers())
+        # and we must wait for responses from them
+        must_query = set(initial_servers_to_query)
+
+        while ((self.num_servers_to_query > len(initial_servers_to_query))
+               and self.extra_servers):
+            initial_servers_to_query.add(self.extra_servers.pop(0))
+
+        return initial_servers_to_query, must_query
+
+    def _send_initial_requests(self, serverlist):
+        self._status.set_status("Sending %d initial queries" % len(serverlist))
         self._queries_outstanding = set()
-        self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
-        dl = []
-        for (peerid, ss) in peerlist.items():
-            self._queries_outstanding.add(peerid)
-            self._do_query(ss, peerid, self._storage_index, self._read_size)
+        for server in serverlist:
+            self._queries_outstanding.add(server)
+            self._do_query(server, self._storage_index, self._read_size)
+
+        if not serverlist:
+            # there is nobody to ask, so we need to short-circuit the state
+            # machine.
+            d = defer.maybeDeferred(self._check_for_done, None)
+            d.addErrback(self._fatal_error)
 
         # control flow beyond this point: state machine. Receiving responses
         # from queries is the input. We might send out more queries, or we
         # might produce a result.
         return None
 
-    def _do_query(self, ss, peerid, storage_index, readsize):
-        self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
-                 peerid=idlib.shortnodeid_b2a(peerid),
+    def _do_query(self, server, storage_index, readsize):
+        self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
+                 name=server.get_name(),
                  readsize=readsize,
                  level=log.NOISY)
-        self._servermap.connections[peerid] = ss
         started = time.time()
-        self._queries_outstanding.add(peerid)
-        d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
-        d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
+        self._queries_outstanding.add(server)
+        d = self._do_read(server, storage_index, [], [(0, readsize)])
+        d.addCallback(self._got_results, server, readsize, storage_index,
                       started)
-        d.addErrback(self._query_failed, peerid)
+        d.addErrback(self._query_failed, server)
         # errors that aren't handled by _query_failed (and errors caused by
         # _query_failed) get logged, but we still want to check for doneness.
         d.addErrback(log.err)
-        d.addBoth(self._check_for_done)
         d.addErrback(self._fatal_error)
+        d.addCallback(self._check_for_done)
         return d
 
-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
+    def _do_read(self, server, storage_index, shnums, readv):
+        ss = server.get_rref()
+        if self._add_lease:
+            # send an add-lease message in parallel. The results are handled
+            # separately. This is sent before the slot_readv() so that we can
+            # be sure the add_lease is retired by the time slot_readv comes
+            # back (this relies upon our knowledge that the server code for
+            # add_lease is synchronous).
+            renew_secret = self._node.get_renewal_secret(server)
+            cancel_secret = self._node.get_cancel_secret(server)
+            d2 = ss.callRemote("add_lease", storage_index,
+                               renew_secret, cancel_secret)
+            # we ignore success
+            d2.addErrback(self._add_lease_failed, server, storage_index)
         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
         return d
 
-    def _got_results(self, datavs, peerid, readsize, stuff, started):
-        lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
-                     peerid=idlib.shortnodeid_b2a(peerid),
-                     numshares=len(datavs),
-                     level=log.NOISY)
+
+    def _got_corrupt_share(self, e, shnum, server, data, lp):
+        """
+        I am called when a remote server returns a corrupt share in
+        response to one of our queries. By corrupt, I mean a share
+        without a valid signature. I then record the failure, notify the
+        server of the corruption, and record the share as bad.
+        """
+        f = failure.Failure(e)
+        self.log(format="bad share: %(f_value)s", f_value=str(f),
+                 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
+        # Notify the server that its share is corrupt.
+        self.notify_server_corruption(server, shnum, str(e))
+        # By flagging this as a bad server, we won't count any of
+        # the other shares on that server as valid, though if we
+        # happen to find a valid version string amongst those
+        # shares, we'll keep track of it so that we don't need
+        # to validate the signature on those again.
+        self._bad_servers.add(server)
+        self._last_failure = f
+        # XXX: Use the reader for this?
+        checkstring = data[:SIGNED_PREFIX_LENGTH]
+        self._servermap.mark_bad_share(server, shnum, checkstring)
+        self._servermap.add_problem(f)
+
+
+    def _got_results(self, datavs, server, readsize, storage_index, started):
+        lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
+                      name=server.get_name(),
+                      numshares=len(datavs))
+        ss = server.get_rref()
         now = time.time()
         elapsed = now - started
-        self._queries_outstanding.discard(peerid)
-        self._must_query.discard(peerid)
-        self._queries_completed += 1
+        def _done_processing(ignored=None):
+            self._queries_outstanding.discard(server)
+            self._servermap.mark_server_reachable(server)
+            self._must_query.discard(server)
+            self._queries_completed += 1
         if not self._running:
             self.log("but we're not running, so we'll ignore it", parent=lp)
-            self._status.add_per_server_time(peerid, "late", started, elapsed)
+            _done_processing()
+            self._status.add_per_server_time(server, "late", started, elapsed)
             return
-        self._status.add_per_server_time(peerid, "query", started, elapsed)
+        self._status.add_per_server_time(server, "query", started, elapsed)
 
         if datavs:
-            self._good_peers.add(peerid)
+            self._good_servers.add(server)
         else:
-            self._empty_peers.add(peerid)
+            self._empty_servers.add(server)
+
+        ds = []
 
-        last_verinfo = None
-        last_shnum = None
         for shnum,datav in datavs.items():
             data = datav[0]
-            try:
-                verinfo = self._got_results_one_share(shnum, data, peerid)
-                last_verinfo = verinfo
-                last_shnum = shnum
-            except CorruptShareError, e:
-                # log it and give the other shares a chance to be processed
-                f = failure.Failure()
-                self.log("bad share: %s %s" % (f, f.value),
-                         parent=lp, level=log.WEIRD)
-                self._bad_peers.add(peerid)
-                self._last_failure = f
-                self._servermap.problems.append(f)
-                pass
-
-        self._status.timings["cumulative_verify"] += (time.time() - now)
-
-        if self._need_privkey and last_verinfo:
-            # send them a request for the privkey. We send one request per
-            # server.
-            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
-             offsets_tuple) = last_verinfo
-            o = dict(offsets_tuple)
-
-            self._queries_outstanding.add(peerid)
-            readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
-            ss = self._servermap.connections[peerid]
-            privkey_started = time.time()
-            d = self._do_read(ss, peerid, self._storage_index,
-                              [last_shnum], readv)
-            d.addCallback(self._got_privkey_results, peerid, last_shnum,
-                          privkey_started)
-            d.addErrback(self._privkey_query_failed, peerid, last_shnum)
-            d.addErrback(log.err)
-            d.addCallback(self._check_for_done)
-            d.addErrback(self._fatal_error)
-
+            reader = MDMFSlotReadProxy(ss,
+                                       storage_index,
+                                       shnum,
+                                       data,
+                                       data_is_everything=(len(data) < readsize))
+
+            # our goal, with each response, is to validate the version
+            # information and share data as best we can at this point --
+            # we do this by validating the signature. To do this, we
+            # need to do the following:
+            #   - If we don't already have the public key, fetch the
+            #     public key. We use this to validate the signature.
+            if not self._node.get_pubkey():
+                # fetch and set the public key.
+                d = reader.get_verification_key()
+                d.addCallback(lambda results, shnum=shnum:
+                              self._try_to_set_pubkey(results, server, shnum, lp))
+                # XXX: Make self._pubkey_query_failed?
+                d.addErrback(lambda error, shnum=shnum, data=data:
+                             self._got_corrupt_share(error, shnum, server, data, lp))
+            else:
+                # we already have the public key.
+                d = defer.succeed(None)
+
+            # Neither of these two branches return anything of
+            # consequence, so the first entry in our deferredlist will
+            # be None.
+
+            # - Next, we need the version information. We almost
+            #   certainly got this by reading the first thousand or so
+            #   bytes of the share on the storage server, so we
+            #   shouldn't need to fetch anything at this step.
+            d2 = reader.get_verinfo()
+            d2.addErrback(lambda error, shnum=shnum, data=data:
+                          self._got_corrupt_share(error, shnum, server, data, lp))
+            # - Next, we need the signature. For an SDMF share, it is
+            #   likely that we fetched this when doing our initial fetch
+            #   to get the version information. In MDMF, this lives at
+            #   the end of the share, so unless the file is quite small,
+            #   we'll need to do a remote fetch to get it.
+            d3 = reader.get_signature()
+            d3.addErrback(lambda error, shnum=shnum, data=data:
+                          self._got_corrupt_share(error, shnum, server, data, lp))
+            #  Once we have all three of these responses, we can move on
+            #  to validating the signature
+
+            # Does the node already have a privkey? If not, we'll try to
+            # fetch it here.
+            if self._need_privkey:
+                d4 = reader.get_encprivkey()
+                d4.addCallback(lambda results, shnum=shnum:
+                               self._try_to_validate_privkey(results, server, shnum, lp))
+                d4.addErrback(lambda error, shnum=shnum:
+                              self._privkey_query_failed(error, server, shnum, lp))
+            else:
+                d4 = defer.succeed(None)
+
+
+            if self.fetch_update_data:
+                # fetch the block hash tree and first + last segment, as
+                # configured earlier.
+                # Then set them in wherever we happen to want to set
+                # them.
+                ds = []
+                # XXX: We do this above, too. Is there a good way to
+                # make the two routines share the value without
+                # introducing more roundtrips?
+                ds.append(reader.get_verinfo())
+                ds.append(reader.get_blockhashes())
+                ds.append(reader.get_block_and_salt(self.start_segment))
+                ds.append(reader.get_block_and_salt(self.end_segment))
+                d5 = deferredutil.gatherResults(ds)
+                d5.addCallback(self._got_update_results_one_share, shnum)
+            else:
+                d5 = defer.succeed(None)
+
+            dl = defer.DeferredList([d, d2, d3, d4, d5])
+            def _append_proxy(passthrough, shnum=shnum, reader=reader):
+                # Store the proxy (with its cache) keyed by serverid and
+                # version.
+                _, (_,verinfo), _, _, _ = passthrough
+                verinfo = self._make_verinfo_hashable(verinfo)
+                self._servermap.proxies[(verinfo,
+                                         server.get_serverid(),
+                                         storage_index, shnum)] = reader
+                return passthrough
+            dl.addCallback(_append_proxy)
+            dl.addBoth(self._turn_barrier)
+            dl.addCallback(lambda results, shnum=shnum:
+                           self._got_signature_one_share(results, shnum, server, lp))
+            dl.addErrback(lambda error, shnum=shnum, data=data:
+                          self._got_corrupt_share(error, shnum, server, data, lp))
+            ds.append(dl)
+        # dl is a deferred list that will fire when all of the shares
+        # that we found on this server are done processing. When dl fires,
+        # we know that processing is done, so we can decrement the
+        # semaphore-like thing that we incremented earlier.
+        dl = defer.DeferredList(ds, fireOnOneErrback=True)
+        # Are we done? Done means that there are no more queries to
+        # send, that there are no outstanding queries, and that we
+        # haven't received any queries that are still processing. If we
+        # are done, self._check_for_done will cause the done deferred
+        # that we returned to our caller to fire, which tells them that
+        # they have a complete servermap, and that we won't be touching
+        # the servermap anymore.
+        dl.addCallback(_done_processing)
+        dl.addCallback(self._check_for_done)
+        dl.addErrback(self._fatal_error)
         # all done!
-        self.log("_got_results done", parent=lp)
-
-    def _got_results_one_share(self, shnum, data, peerid):
-        lp = self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
-                      shnum=shnum,
-                      peerid=idlib.shortnodeid_b2a(peerid))
-
-        # this might raise NeedMoreDataError, if the pubkey and signature
-        # live at some weird offset. That shouldn't happen, so I'm going to
-        # treat it as a bad share.
-        (seqnum, root_hash, IV, k, N, segsize, datalength,
-         pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
-
-        if not self._node._pubkey:
-            fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
-            assert len(fingerprint) == 32
-            if fingerprint != self._node._fingerprint:
-                raise CorruptShareError(peerid, shnum,
-                                        "pubkey doesn't match fingerprint")
-            self._node._pubkey = self._deserialize_pubkey(pubkey_s)
-
-        if self._need_privkey:
-            self._try_to_extract_privkey(data, peerid, shnum)
-
-        (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
-         ig_segsize, ig_datalen, offsets) = unpack_header(data)
-        offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
+        self.log("_got_results done", parent=lp, level=log.NOISY)
+        return dl
+
+
+    def _turn_barrier(self, result):
+        """
+        I help the servermap updater avoid the recursion limit issues
+        discussed in #237.
+        """
+        return fireEventually(result)
+
+
+    def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
+        if self._node.get_pubkey():
+            return # don't go through this again if we don't have to
+        fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
+        assert len(fingerprint) == 32
+        if fingerprint != self._node.get_fingerprint():
+            raise CorruptShareError(server, shnum,
+                                    "pubkey doesn't match fingerprint")
+        self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
+        assert self._node.get_pubkey()
+
+
+    def notify_server_corruption(self, server, shnum, reason):
+        rref = server.get_rref()
+        rref.callRemoteOnly("advise_corrupt_share",
+                            "mutable", self._storage_index, shnum, reason)
+
+
+    def _got_signature_one_share(self, results, shnum, server, lp):
+        # It is our job to give versioninfo to our caller. We need to
+        # raise CorruptShareError if the share is corrupt for any
+        # reason, something that our caller will handle.
+        self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
+                 shnum=shnum,
+                 name=server.get_name(),
+                 level=log.NOISY,
+                 parent=lp)
+        if not self._running:
+            # We can't process the results, since we can't touch the
+            # servermap anymore.
+            self.log("but we're not running anymore.")
+            return None
+
+        _, verinfo, signature, __, ___ = results
+        verinfo = self._make_verinfo_hashable(verinfo[1])
+
+        # This tuple uniquely identifies a share on the grid; we use it
+        # to keep track of the ones that we've already seen.
+        (seqnum,
+         root_hash,
+         saltish,
+         segsize,
+         datalen,
+         k,
+         n,
+         prefix,
+         offsets_tuple) = verinfo
 
-        verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
-                   offsets_tuple)
 
         if verinfo not in self._valid_versions:
-            # it's a new pair. Verify the signature.
-            valid = self._node._pubkey.verify(prefix, signature)
+            # This is a new version tuple, and we need to validate it
+            # against the public key before keeping track of it.
+            assert self._node.get_pubkey()
+            valid = self._node.get_pubkey().verify(prefix, signature[1])
             if not valid:
-                raise CorruptShareError(peerid, shnum, "signature is invalid")
-
-            # ok, it's a valid verinfo. Add it to the list of validated
-            # versions.
-            self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
-                     % (seqnum, base32.b2a(root_hash)[:4],
-                        idlib.shortnodeid_b2a(peerid), shnum,
-                        k, N, segsize, datalength),
-                     parent=lp)
-            self._valid_versions.add(verinfo)
-        # We now know that this is a valid candidate verinfo.
-
-        if (peerid, shnum) in self._servermap.bad_shares:
+                raise CorruptShareError(server, shnum,
+                                        "signature is invalid")
+
+        # ok, it's a valid verinfo. Add it to the list of validated
+        # versions.
+        self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
+                 % (seqnum, base32.b2a(root_hash)[:4],
+                    server.get_name(), shnum,
+                    k, n, segsize, datalen),
+                    parent=lp)
+        self._valid_versions.add(verinfo)
+        # We now know that this is a valid candidate verinfo. Whether or
+        # not this instance of it is valid is a matter for the next
+        # statement; at this point, we just know that if we see this
+        # version info again, that its signature checks out and that
+        # we're okay to skip the signature-checking step.
+
+        # (server, shnum) are bound in the method invocation.
+        if (server, shnum) in self._servermap.get_bad_shares():
             # we've been told that the rest of the data in this share is
             # unusable, so don't add it to the servermap.
             self.log("but we've been told this is a bad share",
@@ -600,113 +864,161 @@ class ServermapUpdater:
 
         # Add the info to our servermap.
         timestamp = time.time()
-        self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
-        # and the versionmap
-        self.versionmap.add(verinfo, (shnum, peerid, timestamp))
+        self._servermap.add_new_share(server, shnum, verinfo, timestamp)
+
         return verinfo
 
-    def _deserialize_pubkey(self, pubkey_s):
-        verifier = rsa.create_verifying_key_from_string(pubkey_s)
-        return verifier
+    def _make_verinfo_hashable(self, verinfo):
+        (seqnum,
+         root_hash,
+         saltish,
+         segsize,
+         datalen,
+         k,
+         n,
+         prefix,
+         offsets) = verinfo
 
-    def _try_to_extract_privkey(self, data, peerid, shnum):
-        try:
-            r = unpack_share(data)
-        except NeedMoreDataError, e:
-            # this share won't help us. oh well.
-            offset = e.encprivkey_offset
-            length = e.encprivkey_length
-            self.log("shnum %d on peerid %s: share was too short (%dB) "
-                     "to get the encprivkey; [%d:%d] ought to hold it" %
-                     (shnum, idlib.shortnodeid_b2a(peerid), len(data),
-                      offset, offset+length))
-            # NOTE: if uncoordinated writes are taking place, someone might
-            # change the share (and most probably move the encprivkey) before
-            # we get a chance to do one of these reads and fetch it. This
-            # will cause us to see a NotEnoughSharesError(unable to fetch
-            # privkey) instead of an UncoordinatedWriteError . This is a
-            # nuisance, but it will go away when we move to DSA-based mutable
-            # files (since the privkey will be small enough to fit in the
-            # write cap).
+        offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
 
-            return
+        verinfo = (seqnum,
+                   root_hash,
+                   saltish,
+                   segsize,
+                   datalen,
+                   k,
+                   n,
+                   prefix,
+                   offsets_tuple)
+        return verinfo
 
-        (seqnum, root_hash, IV, k, N, segsize, datalen,
-         pubkey, signature, share_hash_chain, block_hash_tree,
-         share_data, enc_privkey) = r
+    def _got_update_results_one_share(self, results, share):
+        """
+        I record the update results in results.
+        """
+        assert len(results) == 4
+        verinfo, blockhashes, start, end = results
+        verinfo = self._make_verinfo_hashable(verinfo)
+        update_data = (blockhashes, start, end)
+        self._servermap.set_update_data_for_share_and_verinfo(share,
+                                                              verinfo,
+                                                              update_data)
 
-        return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
 
-    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
+    def _deserialize_pubkey(self, pubkey_s):
+        verifier = rsa.create_verifying_key_from_string(pubkey_s)
+        return verifier
+
 
+    def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
+        """
+        Given a writekey from a remote server, I validate it against the
+        writekey stored in my node. If it is valid, then I set the
+        privkey and encprivkey properties of the node.
+        """
         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
         if alleged_writekey != self._node.get_writekey():
             self.log("invalid privkey from %s shnum %d" %
-                     (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
+                     (server.get_name(), shnum),
+                     parent=lp, level=log.WEIRD, umid="aJVccw")
             return
 
         # it's good
-        self.log("got valid privkey from shnum %d on peerid %s" %
-                 (shnum, idlib.shortnodeid_b2a(peerid)))
+        self.log("got valid privkey from shnum %d on serverid %s" %
+                 (shnum, server.get_name()),
+                 parent=lp)
         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
         self._node._populate_encprivkey(enc_privkey)
         self._node._populate_privkey(privkey)
         self._need_privkey = False
-        self._status.set_privkey_from(peerid)
+        self._status.set_privkey_from(server)
+
 
+    def _add_lease_failed(self, f, server, storage_index):
+        # Older versions of Tahoe didn't handle the add-lease message very
+        # well: <=1.1.0 throws a NameError because it doesn't implement
+        # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
+        # (which is most of them, since we send add-lease to everybody,
+        # before we know whether or not they have any shares for us), and
+        # 1.2.0 throws KeyError even on known buckets due to an internal bug
+        # in the latency-measuring code.
 
-    def _query_failed(self, f, peerid):
-        self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD)
+        # we want to ignore the known-harmless errors and log the others. In
+        # particular we want to log any local errors caused by coding
+        # problems.
+
+        if f.check(DeadReferenceError):
+            return
+        if f.check(RemoteException):
+            if f.value.failure.check(KeyError, IndexError, NameError):
+                # this may ignore a bit too much, but that only hurts us
+                # during debugging
+                return
+            self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
+                     name=server.get_name(),
+                     f_value=str(f.value),
+                     failure=f,
+                     level=log.WEIRD, umid="iqg3mw")
+            return
+        # local errors are cause for alarm
+        log.err(f,
+                format="local error in add_lease to [%(name)s]: %(f_value)s",
+                name=server.get_name(),
+                f_value=str(f.value),
+                level=log.WEIRD, umid="ZWh6HA")
+
+    def _query_failed(self, f, server):
         if not self._running:
             return
-        self._must_query.discard(peerid)
-        self._queries_outstanding.discard(peerid)
-        self._bad_peers.add(peerid)
-        self._servermap.problems.append(f)
-        self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
+        level = log.WEIRD
+        if f.check(DeadReferenceError):
+            level = log.UNUSUAL
+        self.log(format="error during query: %(f_value)s",
+                 f_value=str(f.value), failure=f,
+                 level=level, umid="IHXuQg")
+        self._must_query.discard(server)
+        self._queries_outstanding.discard(server)
+        self._bad_servers.add(server)
+        self._servermap.add_problem(f)
+        # a server could be in both ServerMap.reachable_servers and
+        # .unreachable_servers if they responded to our query, but then an
+        # exception was raised in _got_results.
+        self._servermap.mark_server_unreachable(server)
         self._queries_completed += 1
         self._last_failure = f
 
-    def _got_privkey_results(self, datavs, peerid, shnum, started):
-        now = time.time()
-        elapsed = now - started
-        self._status.add_per_server_time(peerid, "privkey", started, elapsed)
-        self._queries_outstanding.discard(peerid)
-        if not self._need_privkey:
-            return
-        if shnum not in datavs:
-            self.log("privkey wasn't there when we asked it", level=log.WEIRD)
-            return
-        datav = datavs[shnum]
-        enc_privkey = datav[0]
-        self._try_to_validate_privkey(enc_privkey, peerid, shnum)
-
-    def _privkey_query_failed(self, f, peerid, shnum):
-        self._queries_outstanding.discard(peerid)
-        self.log("error during privkey query: %s %s" % (f, f.value),
-                 level=log.WEIRD)
+
+    def _privkey_query_failed(self, f, server, shnum, lp):
+        self._queries_outstanding.discard(server)
         if not self._running:
             return
-        self._queries_outstanding.discard(peerid)
-        self._servermap.problems.append(f)
+        level = log.WEIRD
+        if f.check(DeadReferenceError):
+            level = log.UNUSUAL
+        self.log(format="error during privkey query: %(f_value)s",
+                 f_value=str(f.value), failure=f,
+                 parent=lp, level=level, umid="McoJ5w")
+        self._servermap.add_problem(f)
         self._last_failure = f
 
+
     def _check_for_done(self, res):
         # exit paths:
         #  return self._send_more_queries(outstanding) : send some more queries
         #  return self._done() : all done
         #  return : keep waiting, no new queries
-
         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
                               "%(outstanding)d queries outstanding, "
-                              "%(extra)d extra peers available, "
-                              "%(must)d 'must query' peers left"
+                              "%(extra)d extra servers available, "
+                              "%(must)d 'must query' servers left, "
+                              "need_privkey=%(need_privkey)s"
                               ),
                       mode=self.mode,
                       outstanding=len(self._queries_outstanding),
-                      extra=len(self.extra_peers),
+                      extra=len(self.extra_servers),
                       must=len(self._must_query),
+                      need_privkey=self._need_privkey,
                       level=log.NOISY,
                       )
 
@@ -715,17 +1027,17 @@ class ServermapUpdater:
             return
 
         if self._must_query:
-            # we are still waiting for responses from peers that used to have
+            # we are still waiting for responses from servers that used to have
             # a share, so we must continue to wait. No additional queries are
             # required at this time.
-            self.log("%d 'must query' peers left" % len(self._must_query),
-                     parent=lp)
+            self.log("%d 'must query' servers left" % len(self._must_query),
+                     level=log.NOISY, parent=lp)
             return
 
-        if (not self._queries_outstanding and not self.extra_peers):
-            # all queries have retired, and we have no peers left to ask. No
+        if (not self._queries_outstanding and not self.extra_servers):
+            # all queries have retired, and we have no servers left to ask. No
             # more progress can be made, therefore we are done.
-            self.log("all queries are retired, no extra peers: done",
+            self.log("all queries are retired, no extra servers: done",
                      parent=lp)
             return self._done()
 
@@ -741,7 +1053,7 @@ class ServermapUpdater:
                          parent=lp)
                 return self._done()
 
-        if self.mode == MODE_CHECK:
+        if self.mode in (MODE_CHECK, MODE_REPAIR):
             # we used self._must_query, and we know there aren't any
             # responses still waiting, so that means we must be done
             self.log("done", parent=lp)
@@ -753,15 +1065,15 @@ class ServermapUpdater:
             # version, and we haven't seen any unrecoverable higher-seqnum'ed
             # versions, then we're done.
 
-            if self._queries_completed < self.num_peers_to_query:
+            if self._queries_completed < self.num_servers_to_query:
                 self.log(format="%(completed)d completed, %(query)d to query: need more",
                          completed=self._queries_completed,
-                         query=self.num_peers_to_query,
-                         parent=lp)
+                         query=self.num_servers_to_query,
+                         level=log.NOISY, parent=lp)
                 return self._send_more_queries(MAX_IN_FLIGHT)
             if not recoverable_versions:
                 self.log("no recoverable versions: need more",
-                         parent=lp)
+                         level=log.NOISY, parent=lp)
                 return self._send_more_queries(MAX_IN_FLIGHT)
             highest_recoverable = max(recoverable_versions)
             highest_recoverable_seqnum = highest_recoverable[0]
@@ -771,7 +1083,8 @@ class ServermapUpdater:
                     # don't yet see enough shares to recover it. Try harder.
                     # TODO: consider sending more queries.
                     # TODO: consider limiting the search distance
-                    self.log("evidence of higher seqnum: need more")
+                    self.log("evidence of higher seqnum: need more",
+                             level=log.UNUSUAL, parent=lp)
                     return self._send_more_queries(MAX_IN_FLIGHT)
             # all the unrecoverable versions were old or concurrent with a
             # recoverable version. Good enough.
@@ -785,7 +1098,8 @@ class ServermapUpdater:
             # every server in the world.
 
             if not recoverable_versions:
-                self.log("no recoverable versions: need more", parent=lp)
+                self.log("no recoverable versions: need more", parent=lp,
+                         level=log.NOISY)
                 return self._send_more_queries(MAX_IN_FLIGHT)
 
             last_found = -1
@@ -795,34 +1109,34 @@ class ServermapUpdater:
             states = []
             found_boundary = False
 
-            for i,(peerid,ss) in enumerate(self.full_peerlist):
-                if peerid in self._bad_peers:
+            for i,server in enumerate(self.full_serverlist):
+                if server in self._bad_servers:
                     # query failed
                     states.append("x")
-                    #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
-                elif peerid in self._empty_peers:
+                    #self.log("loop [%s]: x" % server.get_name()
+                elif server in self._empty_servers:
                     # no shares
                     states.append("0")
-                    #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
+                    #self.log("loop [%s]: 0" % server.get_name()
                     if last_found != -1:
                         num_not_found += 1
                         if num_not_found >= self.EPSILON:
                             self.log("found our boundary, %s" %
                                      "".join(states),
-                                     parent=lp)
+                                     parent=lp, level=log.NOISY)
                             found_boundary = True
                             break
 
-                elif peerid in self._good_peers:
+                elif server in self._good_servers:
                     # yes shares
                     states.append("1")
-                    #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
+                    #self.log("loop [%s]: 1" % server.get_name()
                     last_found = i
                     num_not_found = 0
                 else:
                     # not responded yet
                     states.append("?")
-                    #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
+                    #self.log("loop [%s]: ?" % server.get_name()
                     last_not_responded = i
                     num_not_responded += 1
 
@@ -832,11 +1146,11 @@ class ServermapUpdater:
                 if last_not_responded == -1:
                     # we're done
                     self.log("have all our answers",
-                             parent=lp)
+                             parent=lp, level=log.NOISY)
                     # .. unless we're still waiting on the privkey
                     if self._need_privkey:
                         self.log("but we're still waiting for the privkey",
-                                 parent=lp)
+                                 parent=lp, level=log.NOISY)
                         # if we found the boundary but we haven't yet found
                         # the privkey, we may need to look further. If
                         # somehow all the privkeys were corrupted (but the
@@ -848,14 +1162,15 @@ class ServermapUpdater:
                 return self._send_more_queries(num_not_responded)
 
             # if we hit here, we didn't find our boundary, so we're still
-            # waiting for peers
-            self.log("no boundary yet, %s" % "".join(states), parent=lp)
+            # waiting for servers
+            self.log("no boundary yet, %s" % "".join(states), parent=lp,
+                     level=log.NOISY)
             return self._send_more_queries(MAX_IN_FLIGHT)
 
         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
         # arbitrary, really I want this to be something like k -
         # max(known_version_sharecounts) + some extra
-        self.log("catchall: need more", parent=lp)
+        self.log("catchall: need more", parent=lp, level=log.NOISY)
         return self._send_more_queries(MAX_IN_FLIGHT)
 
     def _send_more_queries(self, num_outstanding):
@@ -868,22 +1183,22 @@ class ServermapUpdater:
             active_queries = len(self._queries_outstanding) + len(more_queries)
             if active_queries >= num_outstanding:
                 break
-            if not self.extra_peers:
+            if not self.extra_servers:
                 break
-            more_queries.append(self.extra_peers.pop(0))
+            more_queries.append(self.extra_servers.pop(0))
 
         self.log(format="sending %(more)d more queries: %(who)s",
                  more=len(more_queries),
-                 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
-                               for (peerid,ss) in more_queries]),
+                 who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
                  level=log.NOISY)
 
-        for (peerid, ss) in more_queries:
-            self._do_query(ss, peerid, self._storage_index, self._read_size)
+        for server in more_queries:
+            self._do_query(server, self._storage_index, self._read_size)
             # we'll retrigger when those queries come back
 
     def _done(self):
         if not self._running:
+            self.log("not running; we're already done")
             return
         self._running = False
         now = time.time()
@@ -891,17 +1206,17 @@ class ServermapUpdater:
         self._status.set_finished(now)
         self._status.timings["total"] = elapsed
         self._status.set_progress(1.0)
-        self._status.set_status("Done")
+        self._status.set_status("Finished")
         self._status.set_active(False)
 
-        self._servermap.last_update_mode = self.mode
-        self._servermap.last_update_time = self._started
+        self._servermap.set_last_update(self.mode, self._started)
         # the servermap will not be touched after this
         self.log("servermap: %s" % self._servermap.summarize_versions())
+
         eventually(self._done_deferred.callback, self._servermap)
 
     def _fatal_error(self, f):
-        self.log("fatal error", failure=f, level=log.WEIRD)
+        self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
         self._done_deferred.errback(f)