]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/servermap.py
Remove ResponseCache in favor of MDMFSlotReadProxy's cache. closes #1240.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
index 1f28cdd263a822c2f313bab354d8ddf333a3bf69..4ef85c583f1f60d86ba6610b405346cb49e6bd26 100644 (file)
@@ -6,14 +6,14 @@ from twisted.internet import defer
 from twisted.python import failure
 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
                          fireEventually
-from allmydata.util import base32, hashutil, idlib, log, deferredutil
+from allmydata.util import base32, hashutil, log, deferredutil
 from allmydata.util.dictutil import DictOfSets
 from allmydata.storage.server import si_b2a
 from allmydata.interfaces import IServermapUpdaterStatus
 from pycryptopp.publickey import rsa
 
-from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
-     CorruptShareError
+from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
+     MODE_READ, MODE_REPAIR, CorruptShareError
 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
 
 class UpdateStatus:
@@ -34,11 +34,11 @@ class UpdateStatus:
         self.started = time.time()
         self.finished = None
 
-    def add_per_server_time(self, peerid, op, sent, elapsed):
+    def add_per_server_time(self, server, op, sent, elapsed):
         assert op in ("query", "late", "privkey")
-        if peerid not in self.timings["per_server"]:
-            self.timings["per_server"][peerid] = []
-        self.timings["per_server"][peerid].append((op,sent,elapsed))
+        if server not in self.timings["per_server"]:
+            self.timings["per_server"][server] = []
+        self.timings["per_server"][server].append((op,sent,elapsed))
 
     def get_started(self):
         return self.started
@@ -69,8 +69,8 @@ class UpdateStatus:
         self.storage_index = si
     def set_mode(self, mode):
         self.mode = mode
-    def set_privkey_from(self, peerid):
-        self.privkey_from = peerid
+    def set_privkey_from(self, server):
+        self.privkey_from = server
     def set_status(self, status):
         self.status = status
     def set_progress(self, value):
@@ -95,47 +95,57 @@ class ServerMap:
     has changed since I last retrieved this data'. This reduces the chances
     of clobbering a simultaneous (uncoordinated) write.
 
-    @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
-                     (versionid, timestamp) tuple. Each 'versionid' is a
-                     tuple of (seqnum, root_hash, IV, segsize, datalength,
-                     k, N, signed_prefix, offsets)
-
-    @ivar connections: maps peerid to a RemoteReference
-
-    @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
-                      shares that I should ignore (because a previous user of
-                      the servermap determined that they were invalid). The
-                      updater only locates a certain number of shares: if
-                      some of these turn out to have integrity problems and
-                      are unusable, the caller will need to mark those shares
-                      as bad, then re-update the servermap, then try again.
-                      The dict maps (peerid, shnum) tuple to old checkstring.
+    @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
+                        (versionid, timestamp) tuple. Each 'versionid' is a
+                        tuple of (seqnum, root_hash, IV, segsize, datalength,
+                        k, N, signed_prefix, offsets)
+
+    @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
+                       shares that I should ignore (because a previous user
+                       of the servermap determined that they were invalid).
+                       The updater only locates a certain number of shares:
+                       if some of these turn out to have integrity problems
+                       and are unusable, the caller will need to mark those
+                       shares as bad, then re-update the servermap, then try
+                       again. The dict maps (server, shnum) tuple to old
+                       checkstring.
     """
 
     def __init__(self):
-        self.servermap = {}
-        self.connections = {}
-        self.unreachable_peers = set() # peerids that didn't respond to queries
-        self.reachable_peers = set() # peerids that did respond to queries
-        self.problems = [] # mostly for debugging
-        self.bad_shares = {} # maps (peerid,shnum) to old checkstring
-        self.last_update_mode = None
-        self.last_update_time = 0
-        self.update_data = {} # (verinfo,shnum) => data
+        self._known_shares = {}
+        self.unreachable_servers = set() # servers that didn't respond to queries
+        self.reachable_servers = set() # servers that did respond to queries
+        self._problems = [] # mostly for debugging
+        self._bad_shares = {} # maps (server,shnum) to old checkstring
+        self._last_update_mode = None
+        self._last_update_time = 0
+        self.proxies = {}
+        self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
+        # where blockhashes is a list of bytestrings (the result of
+        # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
+        # (block,salt) tuple-of-bytestrings from get_block_and_salt()
 
     def copy(self):
         s = ServerMap()
-        s.servermap = self.servermap.copy() # tuple->tuple
-        s.connections = self.connections.copy() # str->RemoteReference
-        s.unreachable_peers = set(self.unreachable_peers)
-        s.reachable_peers = set(self.reachable_peers)
-        s.problems = self.problems[:]
-        s.bad_shares = self.bad_shares.copy() # tuple->str
-        s.last_update_mode = self.last_update_mode
-        s.last_update_time = self.last_update_time
+        s._known_shares = self._known_shares.copy() # tuple->tuple
+        s.unreachable_servers = set(self.unreachable_servers)
+        s.reachable_servers = set(self.reachable_servers)
+        s._problems = self._problems[:]
+        s._bad_shares = self._bad_shares.copy() # tuple->str
+        s._last_update_mode = self._last_update_mode
+        s._last_update_time = self._last_update_time
         return s
 
-    def mark_bad_share(self, peerid, shnum, checkstring):
+    def get_reachable_servers(self):
+        return self.reachable_servers
+
+    def mark_server_reachable(self, server):
+        self.reachable_servers.add(server)
+
+    def mark_server_unreachable(self, server):
+        self.unreachable_servers.add(server)
+
+    def mark_bad_share(self, server, shnum, checkstring):
         """This share was found to be bad, either in the checkstring or
         signature (detected during mapupdate), or deeper in the share
         (detected at retrieve time). Remove it from our list of useful
@@ -144,70 +154,84 @@ class ServerMap:
         corrupted or badly signed) so that a repair operation can do the
         test-and-set using it as a reference.
         """
-        key = (peerid, shnum) # record checkstring
-        self.bad_shares[key] = checkstring
-        self.servermap.pop(key, None)
+        key = (server, shnum) # record checkstring
+        self._bad_shares[key] = checkstring
+        self._known_shares.pop(key, None)
 
-    def add_new_share(self, peerid, shnum, verinfo, timestamp):
+    def get_bad_shares(self):
+        # key=(server,shnum) -> checkstring
+        return self._bad_shares
+
+    def add_new_share(self, server, shnum, verinfo, timestamp):
         """We've written a new share out, replacing any that was there
         before."""
-        key = (peerid, shnum)
-        self.bad_shares.pop(key, None)
-        self.servermap[key] = (verinfo, timestamp)
+        key = (server, shnum)
+        self._bad_shares.pop(key, None)
+        self._known_shares[key] = (verinfo, timestamp)
+
+    def add_problem(self, f):
+        self._problems.append(f)
+    def get_problems(self):
+        return self._problems
+
+    def set_last_update(self, mode, when):
+        self._last_update_mode = mode
+        self._last_update_time = when
+    def get_last_update(self):
+        return (self._last_update_mode, self._last_update_time)
 
     def dump(self, out=sys.stdout):
         print >>out, "servermap:"
 
-        for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
+        for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
-                          (idlib.shortnodeid_b2a(peerid), shnum,
+                          (server.get_name(), shnum,
                            seqnum, base32.b2a(root_hash)[:4], k, N,
                            datalength))
-        if self.problems:
-            print >>out, "%d PROBLEMS" % len(self.problems)
-            for f in self.problems:
+        if self._problems:
+            print >>out, "%d PROBLEMS" % len(self._problems)
+            for f in self._problems:
                 print >>out, str(f)
         return out
 
-    def all_peers(self):
-        return set([peerid
-                    for (peerid, shnum)
-                    in self.servermap])
+    def all_servers(self):
+        return set([server for (server, shnum) in self._known_shares])
 
-    def all_peers_for_version(self, verinfo):
-        """Return a set of peerids that hold shares for the given version."""
-        return set([peerid
-                    for ( (peerid, shnum), (verinfo2, timestamp) )
-                    in self.servermap.items()
+    def all_servers_for_version(self, verinfo):
+        """Return a set of servers that hold shares for the given version."""
+        return set([server
+                    for ( (server, shnum), (verinfo2, timestamp) )
+                    in self._known_shares.items()
                     if verinfo == verinfo2])
 
+    def get_known_shares(self):
+        # maps (server,shnum) to (versionid,timestamp)
+        return self._known_shares
+
     def make_sharemap(self):
-        """Return a dict that maps shnum to a set of peerds that hold it."""
+        """Return a dict that maps shnum to a set of servers that hold it."""
         sharemap = DictOfSets()
-        for (peerid, shnum) in self.servermap:
-            sharemap.add(shnum, peerid)
+        for (server, shnum) in self._known_shares:
+            sharemap.add(shnum, server)
         return sharemap
 
     def make_versionmap(self):
-        """Return a dict that maps versionid to sets of (shnum, peerid,
+        """Return a dict that maps versionid to sets of (shnum, server,
         timestamp) tuples."""
         versionmap = DictOfSets()
-        for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
-            versionmap.add(verinfo, (shnum, peerid, timestamp))
+        for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
+            versionmap.add(verinfo, (shnum, server, timestamp))
         return versionmap
 
-    def shares_on_peer(self, peerid):
-        return set([shnum
-                    for (s_peerid, shnum)
-                    in self.servermap
-                    if s_peerid == peerid])
+    def debug_shares_on_server(self, server): # used by tests
+        return set([shnum for (s, shnum) in self._known_shares if s == server])
 
-    def version_on_peer(self, peerid, shnum):
-        key = (peerid, shnum)
-        if key in self.servermap:
-            (verinfo, timestamp) = self.servermap[key]
+    def version_on_server(self, server, shnum):
+        key = (server, shnum)
+        if key in self._known_shares:
+            (verinfo, timestamp) = self._known_shares[key]
             return verinfo
         return None
 
@@ -218,7 +242,7 @@ class ServerMap:
         all_shares = {}
         for verinfo, shares in versionmap.items():
             s = set()
-            for (shnum, peerid, timestamp) in shares:
+            for (shnum, server, timestamp) in shares:
                 s.add(shnum)
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
@@ -244,7 +268,7 @@ class ServerMap:
         bits = []
         for (verinfo, shares) in versionmap.items():
             vstr = self.summarize_version(verinfo)
-            shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+            shnums = set([shnum for (shnum, server, timestamp) in shares])
             bits.append("%d*%s" % (len(shnums), vstr))
         return "/".join(bits)
 
@@ -256,7 +280,7 @@ class ServerMap:
         for (verinfo, shares) in versionmap.items():
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
-            shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+            shnums = set([shnum for (shnum, server, timestamp) in shares])
             if len(shnums) >= k:
                 # this one is recoverable
                 recoverable_versions.add(verinfo)
@@ -272,7 +296,7 @@ class ServerMap:
         for (verinfo, shares) in versionmap.items():
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
-            shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+            shnums = set([shnum for (shnum, server, timestamp) in shares])
             if len(shnums) < k:
                 unrecoverable_versions.add(verinfo)
 
@@ -306,7 +330,7 @@ class ServerMap:
         for (verinfo, shares) in versionmap.items():
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = verinfo
-            shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+            shnums = set([shnum for (shnum, server, timestamp) in shares])
             healths[verinfo] = (len(shnums),k)
             if len(shnums) < k:
                 unrecoverable.add(verinfo)
@@ -403,7 +427,7 @@ class ServermapUpdater:
             self._read_size = 1000
         self._need_privkey = False
 
-        if mode == MODE_WRITE and not self._node.get_privkey():
+        if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
             self._need_privkey = True
         # check+repair: repair requires the privkey, so if we didn't happen
         # to ask for it during the check, we'll have problems doing the
@@ -445,29 +469,21 @@ class ServermapUpdater:
         # avoid re-checking the signatures for each share.
         self._valid_versions = set()
 
-        # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
-        # timestamp) tuples. This is used to figure out which versions might
-        # be retrievable, and to make the eventual data download faster.
-        self.versionmap = DictOfSets()
-
         self._done_deferred = defer.Deferred()
 
-        # first, which peers should be talk to? Any that were in our old
+        # first, which servers should be talk to? Any that were in our old
         # servermap, plus "enough" others.
 
         self._queries_completed = 0
 
         sb = self._storage_broker
-        # All of the peers, permuted by the storage index, as usual.
-        full_peerlist = [(s.get_serverid(), s.get_rref())
-                         for s in sb.get_servers_for_psi(self._storage_index)]
-        self.full_peerlist = full_peerlist # for use later, immutable
-        self.extra_peers = full_peerlist[:] # peers are removed as we use them
-        self._good_peers = set() # peers who had some shares
-        self._empty_peers = set() # peers who don't have any shares
-        self._bad_peers = set() # peers to whom our queries failed
-        self._readers = {} # peerid -> dict(sharewriters), filled in
-                           # after responses come in.
+        # All of the servers, permuted by the storage index, as usual.
+        full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
+        self.full_serverlist = full_serverlist # for use later, immutable
+        self.extra_servers = full_serverlist[:] # servers are removed as we use them
+        self._good_servers = set() # servers who had some shares
+        self._empty_servers = set() # servers who don't have any shares
+        self._bad_servers = set() # servers to whom our queries failed
 
         k = self._node.get_required_shares()
         # For what cases can these conditions work?
@@ -478,23 +494,23 @@ class ServermapUpdater:
         if N is None:
             N = 10
         self.EPSILON = k
-        # we want to send queries to at least this many peers (although we
+        # we want to send queries to at least this many servers (although we
         # might not wait for all of their answers to come back)
-        self.num_peers_to_query = k + self.EPSILON
+        self.num_servers_to_query = k + self.EPSILON
 
-        if self.mode == MODE_CHECK:
-            # We want to query all of the peers.
-            initial_peers_to_query = dict(full_peerlist)
-            must_query = set(initial_peers_to_query.keys())
-            self.extra_peers = []
+        if self.mode in (MODE_CHECK, MODE_REPAIR):
+            # We want to query all of the servers.
+            initial_servers_to_query = list(full_serverlist)
+            must_query = set(initial_servers_to_query)
+            self.extra_servers = []
         elif self.mode == MODE_WRITE:
             # we're planning to replace all the shares, so we want a good
             # chance of finding them all. We will keep searching until we've
             # seen epsilon that don't have a share.
-            # We don't query all of the peers because that could take a while.
-            self.num_peers_to_query = N + self.EPSILON
-            initial_peers_to_query, must_query = self._build_initial_querylist()
-            self.required_num_empty_peers = self.EPSILON
+            # We don't query all of the servers because that could take a while.
+            self.num_servers_to_query = N + self.EPSILON
+            initial_servers_to_query, must_query = self._build_initial_querylist()
+            self.required_num_empty_servers = self.EPSILON
 
             # TODO: arrange to read lots of data from k-ish servers, to avoid
             # the extra round trip required to read large directories. This
@@ -502,55 +518,49 @@ class ServermapUpdater:
             # private key.
 
         else: # MODE_READ, MODE_ANYTHING
-            # 2k peers is good enough.
-            initial_peers_to_query, must_query = self._build_initial_querylist()
-
-        # this is a set of peers that we are required to get responses from:
-        # they are peers who used to have a share, so we need to know where
-        # they currently stand, even if that means we have to wait for a
-        # silently-lost TCP connection to time out. We remove peers from this
-        # set as we get responses.
-        self._must_query = must_query
-
-        # now initial_peers_to_query contains the peers that we should ask,
-        # self.must_query contains the peers that we must have heard from
-        # before we can consider ourselves finished, and self.extra_peers
-        # contains the overflow (peers that we should tap if we don't get
-        # enough responses)
+            # 2*k servers is good enough.
+            initial_servers_to_query, must_query = self._build_initial_querylist()
+
+        # this is a set of servers that we are required to get responses
+        # from: they are servers who used to have a share, so we need to know
+        # where they currently stand, even if that means we have to wait for
+        # a silently-lost TCP connection to time out. We remove servers from
+        # this set as we get responses.
+        self._must_query = set(must_query)
+
+        # now initial_servers_to_query contains the servers that we should
+        # ask, self.must_query contains the servers that we must have heard
+        # from before we can consider ourselves finished, and
+        # self.extra_servers contains the overflow (servers that we should
+        # tap if we don't get enough responses)
         # I guess that self._must_query is a subset of
-        # initial_peers_to_query?
-        assert set(must_query).issubset(set(initial_peers_to_query))
+        # initial_servers_to_query?
+        assert must_query.issubset(initial_servers_to_query)
 
-        self._send_initial_requests(initial_peers_to_query)
+        self._send_initial_requests(initial_servers_to_query)
         self._status.timings["initial_queries"] = time.time() - self._started
         return self._done_deferred
 
     def _build_initial_querylist(self):
-        initial_peers_to_query = {}
-        must_query = set()
-        for peerid in self._servermap.all_peers():
-            ss = self._servermap.connections[peerid]
-            # we send queries to everyone who was already in the sharemap
-            initial_peers_to_query[peerid] = ss
-            # and we must wait for responses from them
-            must_query.add(peerid)
-
-        while ((self.num_peers_to_query > len(initial_peers_to_query))
-               and self.extra_peers):
-            (peerid, ss) = self.extra_peers.pop(0)
-            initial_peers_to_query[peerid] = ss
-
-        return initial_peers_to_query, must_query
-
-    def _send_initial_requests(self, peerlist):
-        self._status.set_status("Sending %d initial queries" % len(peerlist))
+        # we send queries to everyone who was already in the sharemap
+        initial_servers_to_query = set(self._servermap.all_servers())
+        # and we must wait for responses from them
+        must_query = set(initial_servers_to_query)
+
+        while ((self.num_servers_to_query > len(initial_servers_to_query))
+               and self.extra_servers):
+            initial_servers_to_query.add(self.extra_servers.pop(0))
+
+        return initial_servers_to_query, must_query
+
+    def _send_initial_requests(self, serverlist):
+        self._status.set_status("Sending %d initial queries" % len(serverlist))
         self._queries_outstanding = set()
-        self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
-        for (peerid, ss) in peerlist.items():
-            self._queries_outstanding.add(peerid)
-            self._do_query(ss, peerid, self._storage_index, self._read_size)
+        for server in serverlist:
+            self._queries_outstanding.add(server)
+            self._do_query(server, self._storage_index, self._read_size)
 
-        if not peerlist:
+        if not serverlist:
             # there is nobody to ask, so we need to short-circuit the state
             # machine.
             d = defer.maybeDeferred(self._check_for_done, None)
@@ -561,18 +571,17 @@ class ServermapUpdater:
         # might produce a result.
         return None
 
-    def _do_query(self, ss, peerid, storage_index, readsize):
-        self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
-                 peerid=idlib.shortnodeid_b2a(peerid),
+    def _do_query(self, server, storage_index, readsize):
+        self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
+                 name=server.get_name(),
                  readsize=readsize,
                  level=log.NOISY)
-        self._servermap.connections[peerid] = ss
         started = time.time()
-        self._queries_outstanding.add(peerid)
-        d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
-        d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
+        self._queries_outstanding.add(server)
+        d = self._do_read(server, storage_index, [], [(0, readsize)])
+        d.addCallback(self._got_results, server, readsize, storage_index,
                       started)
-        d.addErrback(self._query_failed, peerid)
+        d.addErrback(self._query_failed, server)
         # errors that aren't handled by _query_failed (and errors caused by
         # _query_failed) get logged, but we still want to check for doneness.
         d.addErrback(log.err)
@@ -580,24 +589,25 @@ class ServermapUpdater:
         d.addCallback(self._check_for_done)
         return d
 
-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
+    def _do_read(self, server, storage_index, shnums, readv):
+        ss = server.get_rref()
         if self._add_lease:
             # send an add-lease message in parallel. The results are handled
             # separately. This is sent before the slot_readv() so that we can
             # be sure the add_lease is retired by the time slot_readv comes
             # back (this relies upon our knowledge that the server code for
             # add_lease is synchronous).
-            renew_secret = self._node.get_renewal_secret(peerid)
-            cancel_secret = self._node.get_cancel_secret(peerid)
+            renew_secret = self._node.get_renewal_secret(server)
+            cancel_secret = self._node.get_cancel_secret(server)
             d2 = ss.callRemote("add_lease", storage_index,
                                renew_secret, cancel_secret)
             # we ignore success
-            d2.addErrback(self._add_lease_failed, peerid, storage_index)
+            d2.addErrback(self._add_lease_failed, server, storage_index)
         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
         return d
 
 
-    def _got_corrupt_share(self, e, shnum, peerid, data, lp):
+    def _got_corrupt_share(self, e, shnum, server, data, lp):
         """
         I am called when a remote server returns a corrupt share in
         response to one of our queries. By corrupt, I mean a share
@@ -608,57 +618,44 @@ class ServermapUpdater:
         self.log(format="bad share: %(f_value)s", f_value=str(f),
                  failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
         # Notify the server that its share is corrupt.
-        self.notify_server_corruption(peerid, shnum, str(e))
-        # By flagging this as a bad peer, we won't count any of
-        # the other shares on that peer as valid, though if we
+        self.notify_server_corruption(server, shnum, str(e))
+        # By flagging this as a bad server, we won't count any of
+        # the other shares on that server as valid, though if we
         # happen to find a valid version string amongst those
         # shares, we'll keep track of it so that we don't need
         # to validate the signature on those again.
-        self._bad_peers.add(peerid)
+        self._bad_servers.add(server)
         self._last_failure = f
         # XXX: Use the reader for this?
         checkstring = data[:SIGNED_PREFIX_LENGTH]
-        self._servermap.mark_bad_share(peerid, shnum, checkstring)
-        self._servermap.problems.append(f)
-
-
-    def _cache_good_sharedata(self, verinfo, shnum, now, data):
-        """
-        If one of my queries returns successfully (which means that we
-        were able to and successfully did validate the signature), I
-        cache the data that we initially fetched from the storage
-        server. This will help reduce the number of roundtrips that need
-        to occur when the file is downloaded, or when the file is
-        updated.
-        """
-        if verinfo:
-            self._node._add_to_cache(verinfo, shnum, 0, data)
+        self._servermap.mark_bad_share(server, shnum, checkstring)
+        self._servermap.add_problem(f)
 
 
-    def _got_results(self, datavs, peerid, readsize, stuff, started):
-        lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
-                      peerid=idlib.shortnodeid_b2a(peerid),
+    def _got_results(self, datavs, server, readsize, storage_index, started):
+        lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
+                      name=server.get_name(),
                       numshares=len(datavs))
+        ss = server.get_rref()
         now = time.time()
         elapsed = now - started
         def _done_processing(ignored=None):
-            self._queries_outstanding.discard(peerid)
-            self._servermap.reachable_peers.add(peerid)
-            self._must_query.discard(peerid)
+            self._queries_outstanding.discard(server)
+            self._servermap.mark_server_reachable(server)
+            self._must_query.discard(server)
             self._queries_completed += 1
         if not self._running:
             self.log("but we're not running, so we'll ignore it", parent=lp)
             _done_processing()
-            self._status.add_per_server_time(peerid, "late", started, elapsed)
+            self._status.add_per_server_time(server, "late", started, elapsed)
             return
-        self._status.add_per_server_time(peerid, "query", started, elapsed)
+        self._status.add_per_server_time(server, "query", started, elapsed)
 
         if datavs:
-            self._good_peers.add(peerid)
+            self._good_servers.add(server)
         else:
-            self._empty_peers.add(peerid)
+            self._empty_servers.add(server)
 
-        ss, storage_index = stuff
         ds = []
 
         for shnum,datav in datavs.items():
@@ -666,8 +663,9 @@ class ServermapUpdater:
             reader = MDMFSlotReadProxy(ss,
                                        storage_index,
                                        shnum,
-                                       data)
-            self._readers.setdefault(peerid, dict())[shnum] = reader
+                                       data,
+                                       data_is_everything=(len(data) < readsize))
+
             # our goal, with each response, is to validate the version
             # information and share data as best we can at this point --
             # we do this by validating the signature. To do this, we
@@ -677,11 +675,11 @@ class ServermapUpdater:
             if not self._node.get_pubkey():
                 # fetch and set the public key.
                 d = reader.get_verification_key()
-                d.addCallback(lambda results, shnum=shnum, peerid=peerid:
-                    self._try_to_set_pubkey(results, peerid, shnum, lp))
+                d.addCallback(lambda results, shnum=shnum:
+                              self._try_to_set_pubkey(results, server, shnum, lp))
                 # XXX: Make self._pubkey_query_failed?
-                d.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
-                    self._got_corrupt_share(error, shnum, peerid, data, lp))
+                d.addErrback(lambda error, shnum=shnum, data=data:
+                             self._got_corrupt_share(error, shnum, server, data, lp))
             else:
                 # we already have the public key.
                 d = defer.succeed(None)
@@ -695,16 +693,16 @@ class ServermapUpdater:
             #   bytes of the share on the storage server, so we
             #   shouldn't need to fetch anything at this step.
             d2 = reader.get_verinfo()
-            d2.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
-                self._got_corrupt_share(error, shnum, peerid, data, lp))
+            d2.addErrback(lambda error, shnum=shnum, data=data:
+                          self._got_corrupt_share(error, shnum, server, data, lp))
             # - Next, we need the signature. For an SDMF share, it is
             #   likely that we fetched this when doing our initial fetch
             #   to get the version information. In MDMF, this lives at
             #   the end of the share, so unless the file is quite small,
             #   we'll need to do a remote fetch to get it.
             d3 = reader.get_signature()
-            d3.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
-                self._got_corrupt_share(error, shnum, peerid, data, lp))
+            d3.addErrback(lambda error, shnum=shnum, data=data:
+                          self._got_corrupt_share(error, shnum, server, data, lp))
             #  Once we have all three of these responses, we can move on
             #  to validating the signature
 
@@ -712,10 +710,10 @@ class ServermapUpdater:
             # fetch it here.
             if self._need_privkey:
                 d4 = reader.get_encprivkey()
-                d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
-                    self._try_to_validate_privkey(results, peerid, shnum, lp))
-                d4.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
-                    self._privkey_query_failed(error, shnum, data, lp))
+                d4.addCallback(lambda results, shnum=shnum:
+                               self._try_to_validate_privkey(results, server, shnum, lp))
+                d4.addErrback(lambda error, shnum=shnum:
+                              self._privkey_query_failed(error, server, shnum, lp))
             else:
                 d4 = defer.succeed(None)
 
@@ -739,16 +737,24 @@ class ServermapUpdater:
                 d5 = defer.succeed(None)
 
             dl = defer.DeferredList([d, d2, d3, d4, d5])
+            def _append_proxy(passthrough, shnum=shnum, reader=reader):
+                # Store the proxy (with its cache) keyed by serverid and
+                # version.
+                _, (_,verinfo), _, _, _ = passthrough
+                verinfo = self._make_verinfo_hashable(verinfo)
+                self._servermap.proxies[(verinfo,
+                                         server.get_serverid(),
+                                         storage_index, shnum)] = reader
+                return passthrough
+            dl.addCallback(_append_proxy)
             dl.addBoth(self._turn_barrier)
-            dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
-                self._got_signature_one_share(results, shnum, peerid, lp))
+            dl.addCallback(lambda results, shnum=shnum:
+                           self._got_signature_one_share(results, shnum, server, lp))
             dl.addErrback(lambda error, shnum=shnum, data=data:
-               self._got_corrupt_share(error, shnum, peerid, data, lp))
-            dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
-                self._cache_good_sharedata(verinfo, shnum, now, data))
+                          self._got_corrupt_share(error, shnum, server, data, lp))
             ds.append(dl)
         # dl is a deferred list that will fire when all of the shares
-        # that we found on this peer are done processing. When dl fires,
+        # that we found on this server are done processing. When dl fires,
         # we know that processing is done, so we can decrement the
         # semaphore-like thing that we incremented earlier.
         dl = defer.DeferredList(ds, fireOnOneErrback=True)
@@ -775,31 +781,31 @@ class ServermapUpdater:
         return fireEventually(result)
 
 
-    def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
+    def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
         if self._node.get_pubkey():
             return # don't go through this again if we don't have to
         fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
         assert len(fingerprint) == 32
         if fingerprint != self._node.get_fingerprint():
-            raise CorruptShareError(peerid, shnum,
-                                "pubkey doesn't match fingerprint")
+            raise CorruptShareError(server, shnum,
+                                    "pubkey doesn't match fingerprint")
         self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
         assert self._node.get_pubkey()
 
 
-    def notify_server_corruption(self, peerid, shnum, reason):
-        ss = self._servermap.connections[peerid]
-        ss.callRemoteOnly("advise_corrupt_share",
-                          "mutable", self._storage_index, shnum, reason)
+    def notify_server_corruption(self, server, shnum, reason):
+        rref = server.get_rref()
+        rref.callRemoteOnly("advise_corrupt_share",
+                            "mutable", self._storage_index, shnum, reason)
 
 
-    def _got_signature_one_share(self, results, shnum, peerid, lp):
+    def _got_signature_one_share(self, results, shnum, server, lp):
         # It is our job to give versioninfo to our caller. We need to
         # raise CorruptShareError if the share is corrupt for any
         # reason, something that our caller will handle.
-        self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
+        self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
                  shnum=shnum,
-                 peerid=idlib.shortnodeid_b2a(peerid),
+                 name=server.get_name(),
                  level=log.NOISY,
                  parent=lp)
         if not self._running:
@@ -809,6 +815,10 @@ class ServermapUpdater:
             return None
 
         _, verinfo, signature, __, ___ = results
+        verinfo = self._make_verinfo_hashable(verinfo[1])
+
+        # This tuple uniquely identifies a share on the grid; we use it
+        # to keep track of the ones that we've already seen.
         (seqnum,
          root_hash,
          saltish,
@@ -817,22 +827,8 @@ class ServermapUpdater:
          k,
          n,
          prefix,
-         offsets) = verinfo[1]
-        offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
+         offsets_tuple) = verinfo
 
-        # XXX: This should be done for us in the method, so
-        # presumably you can go in there and fix it.
-        verinfo = (seqnum,
-                   root_hash,
-                   saltish,
-                   segsize,
-                   datalen,
-                   k,
-                   n,
-                   prefix,
-                   offsets_tuple)
-        # This tuple uniquely identifies a share on the grid; we use it
-        # to keep track of the ones that we've already seen.
 
         if verinfo not in self._valid_versions:
             # This is a new version tuple, and we need to validate it
@@ -840,14 +836,14 @@ class ServermapUpdater:
             assert self._node.get_pubkey()
             valid = self._node.get_pubkey().verify(prefix, signature[1])
             if not valid:
-                raise CorruptShareError(peerid, shnum,
+                raise CorruptShareError(server, shnum,
                                         "signature is invalid")
 
         # ok, it's a valid verinfo. Add it to the list of validated
         # versions.
         self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
                  % (seqnum, base32.b2a(root_hash)[:4],
-                    idlib.shortnodeid_b2a(peerid), shnum,
+                    server.get_name(), shnum,
                     k, n, segsize, datalen),
                     parent=lp)
         self._valid_versions.add(verinfo)
@@ -857,8 +853,8 @@ class ServermapUpdater:
         # version info again, that its signature checks out and that
         # we're okay to skip the signature-checking step.
 
-        # (peerid, shnum) are bound in the method invocation.
-        if (peerid, shnum) in self._servermap.bad_shares:
+        # (server, shnum) are bound in the method invocation.
+        if (server, shnum) in self._servermap.get_bad_shares():
             # we've been told that the rest of the data in this share is
             # unusable, so don't add it to the servermap.
             self.log("but we've been told this is a bad share",
@@ -867,19 +863,11 @@ class ServermapUpdater:
 
         # Add the info to our servermap.
         timestamp = time.time()
-        self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
-        # and the versionmap
-        self.versionmap.add(verinfo, (shnum, peerid, timestamp))
+        self._servermap.add_new_share(server, shnum, verinfo, timestamp)
 
         return verinfo
 
-
-    def _got_update_results_one_share(self, results, share):
-        """
-        I record the update results in results.
-        """
-        assert len(results) == 4
-        verinfo, blockhashes, start, end = results
+    def _make_verinfo_hashable(self, verinfo):
         (seqnum,
          root_hash,
          saltish,
@@ -889,10 +877,9 @@ class ServermapUpdater:
          n,
          prefix,
          offsets) = verinfo
+
         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
 
-        # XXX: This should be done for us in the method, so
-        # presumably you can go in there and fix it.
         verinfo = (seqnum,
                    root_hash,
                    saltish,
@@ -902,7 +889,15 @@ class ServermapUpdater:
                    n,
                    prefix,
                    offsets_tuple)
+        return verinfo
 
+    def _got_update_results_one_share(self, results, share):
+        """
+        I record the update results in results.
+        """
+        assert len(results) == 4
+        verinfo, blockhashes, start, end = results
+        verinfo = self._make_verinfo_hashable(verinfo)
         update_data = (blockhashes, start, end)
         self._servermap.set_update_data_for_share_and_verinfo(share,
                                                               verinfo,
@@ -914,7 +909,7 @@ class ServermapUpdater:
         return verifier
 
 
-    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
+    def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
         """
         Given a writekey from a remote server, I validate it against the
         writekey stored in my node. If it is valid, then I set the
@@ -924,22 +919,22 @@ class ServermapUpdater:
         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
         if alleged_writekey != self._node.get_writekey():
             self.log("invalid privkey from %s shnum %d" %
-                     (idlib.nodeid_b2a(peerid)[:8], shnum),
+                     (server.get_name(), shnum),
                      parent=lp, level=log.WEIRD, umid="aJVccw")
             return
 
         # it's good
-        self.log("got valid privkey from shnum %d on peerid %s" %
-                 (shnum, idlib.shortnodeid_b2a(peerid)),
+        self.log("got valid privkey from shnum %d on serverid %s" %
+                 (shnum, server.get_name()),
                  parent=lp)
         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
         self._node._populate_encprivkey(enc_privkey)
         self._node._populate_privkey(privkey)
         self._need_privkey = False
-        self._status.set_privkey_from(peerid)
+        self._status.set_privkey_from(server)
 
 
-    def _add_lease_failed(self, f, peerid, storage_index):
+    def _add_lease_failed(self, f, server, storage_index):
         # Older versions of Tahoe didn't handle the add-lease message very
         # well: <=1.1.0 throws a NameError because it doesn't implement
         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
@@ -959,20 +954,20 @@ class ServermapUpdater:
                 # this may ignore a bit too much, but that only hurts us
                 # during debugging
                 return
-            self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
-                     peerid=idlib.shortnodeid_b2a(peerid),
+            self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
+                     name=server.get_name(),
                      f_value=str(f.value),
                      failure=f,
                      level=log.WEIRD, umid="iqg3mw")
             return
         # local errors are cause for alarm
         log.err(f,
-                format="local error in add_lease to [%(peerid)s]: %(f_value)s",
-                peerid=idlib.shortnodeid_b2a(peerid),
+                format="local error in add_lease to [%(name)s]: %(f_value)s",
+                name=server.get_name(),
                 f_value=str(f.value),
                 level=log.WEIRD, umid="ZWh6HA")
 
-    def _query_failed(self, f, peerid):
+    def _query_failed(self, f, server):
         if not self._running:
             return
         level = log.WEIRD
@@ -981,20 +976,20 @@ class ServermapUpdater:
         self.log(format="error during query: %(f_value)s",
                  f_value=str(f.value), failure=f,
                  level=level, umid="IHXuQg")
-        self._must_query.discard(peerid)
-        self._queries_outstanding.discard(peerid)
-        self._bad_peers.add(peerid)
-        self._servermap.problems.append(f)
-        # a peerid could be in both ServerMap.reachable_peers and
-        # .unreachable_peers if they responded to our query, but then an
+        self._must_query.discard(server)
+        self._queries_outstanding.discard(server)
+        self._bad_servers.add(server)
+        self._servermap.add_problem(f)
+        # a server could be in both ServerMap.reachable_servers and
+        # .unreachable_servers if they responded to our query, but then an
         # exception was raised in _got_results.
-        self._servermap.unreachable_peers.add(peerid)
+        self._servermap.mark_server_unreachable(server)
         self._queries_completed += 1
         self._last_failure = f
 
 
-    def _privkey_query_failed(self, f, peerid, shnum, lp):
-        self._queries_outstanding.discard(peerid)
+    def _privkey_query_failed(self, f, server, shnum, lp):
+        self._queries_outstanding.discard(server)
         if not self._running:
             return
         level = log.WEIRD
@@ -1003,7 +998,7 @@ class ServermapUpdater:
         self.log(format="error during privkey query: %(f_value)s",
                  f_value=str(f.value), failure=f,
                  parent=lp, level=level, umid="McoJ5w")
-        self._servermap.problems.append(f)
+        self._servermap.add_problem(f)
         self._last_failure = f
 
 
@@ -1014,13 +1009,13 @@ class ServermapUpdater:
         #  return : keep waiting, no new queries
         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
                               "%(outstanding)d queries outstanding, "
-                              "%(extra)d extra peers available, "
-                              "%(must)d 'must query' peers left, "
+                              "%(extra)d extra servers available, "
+                              "%(must)d 'must query' servers left, "
                               "need_privkey=%(need_privkey)s"
                               ),
                       mode=self.mode,
                       outstanding=len(self._queries_outstanding),
-                      extra=len(self.extra_peers),
+                      extra=len(self.extra_servers),
                       must=len(self._must_query),
                       need_privkey=self._need_privkey,
                       level=log.NOISY,
@@ -1031,17 +1026,17 @@ class ServermapUpdater:
             return
 
         if self._must_query:
-            # we are still waiting for responses from peers that used to have
+            # we are still waiting for responses from servers that used to have
             # a share, so we must continue to wait. No additional queries are
             # required at this time.
-            self.log("%d 'must query' peers left" % len(self._must_query),
+            self.log("%d 'must query' servers left" % len(self._must_query),
                      level=log.NOISY, parent=lp)
             return
 
-        if (not self._queries_outstanding and not self.extra_peers):
-            # all queries have retired, and we have no peers left to ask. No
+        if (not self._queries_outstanding and not self.extra_servers):
+            # all queries have retired, and we have no servers left to ask. No
             # more progress can be made, therefore we are done.
-            self.log("all queries are retired, no extra peers: done",
+            self.log("all queries are retired, no extra servers: done",
                      parent=lp)
             return self._done()
 
@@ -1057,7 +1052,7 @@ class ServermapUpdater:
                          parent=lp)
                 return self._done()
 
-        if self.mode == MODE_CHECK:
+        if self.mode in (MODE_CHECK, MODE_REPAIR):
             # we used self._must_query, and we know there aren't any
             # responses still waiting, so that means we must be done
             self.log("done", parent=lp)
@@ -1069,10 +1064,10 @@ class ServermapUpdater:
             # version, and we haven't seen any unrecoverable higher-seqnum'ed
             # versions, then we're done.
 
-            if self._queries_completed < self.num_peers_to_query:
+            if self._queries_completed < self.num_servers_to_query:
                 self.log(format="%(completed)d completed, %(query)d to query: need more",
                          completed=self._queries_completed,
-                         query=self.num_peers_to_query,
+                         query=self.num_servers_to_query,
                          level=log.NOISY, parent=lp)
                 return self._send_more_queries(MAX_IN_FLIGHT)
             if not recoverable_versions:
@@ -1113,15 +1108,15 @@ class ServermapUpdater:
             states = []
             found_boundary = False
 
-            for i,(peerid,ss) in enumerate(self.full_peerlist):
-                if peerid in self._bad_peers:
+            for i,server in enumerate(self.full_serverlist):
+                if server in self._bad_servers:
                     # query failed
                     states.append("x")
-                    #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
-                elif peerid in self._empty_peers:
+                    #self.log("loop [%s]: x" % server.get_name()
+                elif server in self._empty_servers:
                     # no shares
                     states.append("0")
-                    #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
+                    #self.log("loop [%s]: 0" % server.get_name()
                     if last_found != -1:
                         num_not_found += 1
                         if num_not_found >= self.EPSILON:
@@ -1131,16 +1126,16 @@ class ServermapUpdater:
                             found_boundary = True
                             break
 
-                elif peerid in self._good_peers:
+                elif server in self._good_servers:
                     # yes shares
                     states.append("1")
-                    #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
+                    #self.log("loop [%s]: 1" % server.get_name()
                     last_found = i
                     num_not_found = 0
                 else:
                     # not responded yet
                     states.append("?")
-                    #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
+                    #self.log("loop [%s]: ?" % server.get_name()
                     last_not_responded = i
                     num_not_responded += 1
 
@@ -1166,7 +1161,7 @@ class ServermapUpdater:
                 return self._send_more_queries(num_not_responded)
 
             # if we hit here, we didn't find our boundary, so we're still
-            # waiting for peers
+            # waiting for servers
             self.log("no boundary yet, %s" % "".join(states), parent=lp,
                      level=log.NOISY)
             return self._send_more_queries(MAX_IN_FLIGHT)
@@ -1187,18 +1182,17 @@ class ServermapUpdater:
             active_queries = len(self._queries_outstanding) + len(more_queries)
             if active_queries >= num_outstanding:
                 break
-            if not self.extra_peers:
+            if not self.extra_servers:
                 break
-            more_queries.append(self.extra_peers.pop(0))
+            more_queries.append(self.extra_servers.pop(0))
 
         self.log(format="sending %(more)d more queries: %(who)s",
                  more=len(more_queries),
-                 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
-                               for (peerid,ss) in more_queries]),
+                 who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
                  level=log.NOISY)
 
-        for (peerid, ss) in more_queries:
-            self._do_query(ss, peerid, self._storage_index, self._read_size)
+        for server in more_queries:
+            self._do_query(server, self._storage_index, self._read_size)
             # we'll retrigger when those queries come back
 
     def _done(self):
@@ -1214,8 +1208,7 @@ class ServermapUpdater:
         self._status.set_status("Finished")
         self._status.set_active(False)
 
-        self._servermap.last_update_mode = self.mode
-        self._servermap.last_update_time = self._started
+        self._servermap.set_last_update(self.mode, self._started)
         # the servermap will not be touched after this
         self.log("servermap: %s" % self._servermap.summarize_versions())