mutable/servermap: Rework the servermap to work with MDMF mutable files
authorKevan Carstensen <kevan@isnotajoke.com>
Sun, 7 Aug 2011 00:42:59 +0000 (17:42 -0700)
committerKevan Carstensen <kevan@isnotajoke.com>
Sun, 7 Aug 2011 00:42:59 +0000 (17:42 -0700)
src/allmydata/mutable/servermap.py

index c69e4108952b91b49662c7f61069ee13213f62a4..cb93fc5ddaa74d074765ec2f76b00caf6f9d12f3 100644 (file)
@@ -4,17 +4,17 @@ from zope.interface import implements
 from itertools import count
 from twisted.internet import defer
 from twisted.python import failure
-from foolscap.api import DeadReferenceError, RemoteException, eventually
-from allmydata.util import base32, hashutil, idlib, log
+from foolscap.api import DeadReferenceError, RemoteException, eventually, \
+                         fireEventually
+from allmydata.util import base32, hashutil, idlib, log, deferredutil
 from allmydata.util.dictutil import DictOfSets
 from allmydata.storage.server import si_b2a
 from allmydata.interfaces import IServermapUpdaterStatus
 from pycryptopp.publickey import rsa
 
 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
-     CorruptShareError, NeedMoreDataError
-from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
-     SIGNED_PREFIX_LENGTH
+     CorruptShareError
+from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
 
 class UpdateStatus:
     implements(IServermapUpdaterStatus)
@@ -121,6 +121,7 @@ class ServerMap:
         self.bad_shares = {} # maps (peerid,shnum) to old checkstring
         self.last_update_mode = None
         self.last_update_time = 0
+        self.update_data = {} # (verinfo,shnum) => data
 
     def copy(self):
         s = ServerMap()
@@ -251,7 +252,6 @@ class ServerMap:
         """Return a set of versionids, one for each version that is currently
         recoverable."""
         versionmap = self.make_versionmap()
-
         recoverable_versions = set()
         for (verinfo, shares) in versionmap.items():
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
@@ -337,9 +337,25 @@ class ServerMap:
         return False
 
 
+    def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
+        """
+        I return the update data for the given shnum
+        """
+        update_data = self.update_data[shnum]
+        update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
+        return update_datum
+
+
+    def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
+        """
+        I record the block hash tree for the given shnum.
+        """
+        self.update_data.setdefault(shnum , []).append((verinfo, data))
+
+
 class ServermapUpdater:
     def __init__(self, filenode, storage_broker, monitor, servermap,
-                 mode=MODE_READ, add_lease=False):
+                 mode=MODE_READ, add_lease=False, update_range=None):
         """I update a servermap, locating a sufficient number of useful
         shares and remembering where they are located.
 
@@ -364,6 +380,7 @@ class ServermapUpdater:
         self._servers_responded = set()
 
         # how much data should we read?
+        # SDMF:
         #  * if we only need the checkstring, then [0:75]
         #  * if we need to validate the checkstring sig, then [543ish:799ish]
         #  * if we need the verification key, then [107:436ish]
@@ -371,21 +388,38 @@ class ServermapUpdater:
         #  * if we need the encrypted private key, we want [-1216ish:]
         #   * but we can't read from negative offsets
         #   * the offset table tells us the 'ish', also the positive offset
-        # A future version of the SMDF slot format should consider using
-        # fixed-size slots so we can retrieve less data. For now, we'll just
-        # read 4000 bytes, which also happens to read enough actual data to
-        # pre-fetch an 18-entry dirnode.
+        # MDMF:
+        #  * Checkstring? [0:72]
+        #  * If we want to validate the checkstring, then [0:72], [143:?] --
+        #    the offset table will tell us for sure.
+        #  * If we need the verification key, we have to consult the offset
+        #    table as well.
+        # At this point, we don't know which we are. Our filenode can
+        # tell us, but it might be lying -- in some cases, we're
+        # responsible for telling it which kind of file it is.
         self._read_size = 4000
         if mode == MODE_CHECK:
             # we use unpack_prefix_and_signature, so we need 1k
             self._read_size = 1000
         self._need_privkey = False
+
         if mode == MODE_WRITE and not self._node.get_privkey():
             self._need_privkey = True
         # check+repair: repair requires the privkey, so if we didn't happen
         # to ask for it during the check, we'll have problems doing the
         # publish.
 
+        self.fetch_update_data = False
+        if mode == MODE_WRITE and update_range:
+            # We're updating the servermap in preparation for an
+            # in-place file update, so we need to fetch some additional
+            # data from each share that we find.
+            assert len(update_range) == 2
+
+            self.start_segment = update_range[0]
+            self.end_segment = update_range[1]
+            self.fetch_update_data = True
+
         prefix = si_b2a(self._storage_index)[:5]
         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
                                    si=prefix, mode=mode)
@@ -424,6 +458,7 @@ class ServermapUpdater:
         self._queries_completed = 0
 
         sb = self._storage_broker
+        # All of the peers, permuted by the storage index, as usual.
         full_peerlist = [(s.get_serverid(), s.get_rref())
                          for s in sb.get_servers_for_psi(self._storage_index)]
         self.full_peerlist = full_peerlist # for use later, immutable
@@ -431,8 +466,11 @@ class ServermapUpdater:
         self._good_peers = set() # peers who had some shares
         self._empty_peers = set() # peers who don't have any shares
         self._bad_peers = set() # peers to whom our queries failed
+        self._readers = {} # peerid -> dict(sharewriters), filled in
+                           # after responses come in.
 
         k = self._node.get_required_shares()
+        # For what cases can these conditions work?
         if k is None:
             # make a guess
             k = 3
@@ -445,6 +483,7 @@ class ServermapUpdater:
         self.num_peers_to_query = k + self.EPSILON
 
         if self.mode == MODE_CHECK:
+            # We want to query all of the peers.
             initial_peers_to_query = dict(full_peerlist)
             must_query = set(initial_peers_to_query.keys())
             self.extra_peers = []
@@ -452,6 +491,7 @@ class ServermapUpdater:
             # we're planning to replace all the shares, so we want a good
             # chance of finding them all. We will keep searching until we've
             # seen epsilon that don't have a share.
+            # We don't query all of the peers because that could take a while.
             self.num_peers_to_query = N + self.EPSILON
             initial_peers_to_query, must_query = self._build_initial_querylist()
             self.required_num_empty_peers = self.EPSILON
@@ -461,7 +501,8 @@ class ServermapUpdater:
             # might also avoid the round trip required to read the encrypted
             # private key.
 
-        else:
+        else: # MODE_READ, MODE_ANYTHING
+            # 2k peers is good enough.
             initial_peers_to_query, must_query = self._build_initial_querylist()
 
         # this is a set of peers that we are required to get responses from:
@@ -476,6 +517,9 @@ class ServermapUpdater:
         # before we can consider ourselves finished, and self.extra_peers
         # contains the overflow (peers that we should tap if we don't get
         # enough responses)
+        # I guess that self._must_query is a subset of
+        # initial_peers_to_query?
+        assert set(must_query).issubset(set(initial_peers_to_query))
 
         self._send_initial_requests(initial_peers_to_query)
         self._status.timings["initial_queries"] = time.time() - self._started
@@ -532,8 +576,8 @@ class ServermapUpdater:
         # errors that aren't handled by _query_failed (and errors caused by
         # _query_failed) get logged, but we still want to check for doneness.
         d.addErrback(log.err)
-        d.addBoth(self._check_for_done)
         d.addErrback(self._fatal_error)
+        d.addCallback(self._check_for_done)
         return d
 
     def _do_read(self, ss, peerid, storage_index, shnums, readv):
@@ -552,20 +596,59 @@ class ServermapUpdater:
         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
         return d
 
+
+    def _got_corrupt_share(self, e, shnum, peerid, data, lp):
+        """
+        I am called when a remote server returns a corrupt share in
+        response to one of our queries. By corrupt, I mean a share
+        without a valid signature. I then record the failure, notify the
+        server of the corruption, and record the share as bad.
+        """
+        f = failure.Failure(e)
+        self.log(format="bad share: %(f_value)s", f_value=str(f),
+                 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
+        # Notify the server that its share is corrupt.
+        self.notify_server_corruption(peerid, shnum, str(e))
+        # By flagging this as a bad peer, we won't count any of
+        # the other shares on that peer as valid, though if we
+        # happen to find a valid version string amongst those
+        # shares, we'll keep track of it so that we don't need
+        # to validate the signature on those again.
+        self._bad_peers.add(peerid)
+        self._last_failure = f
+        # XXX: Use the reader for this?
+        checkstring = data[:SIGNED_PREFIX_LENGTH]
+        self._servermap.mark_bad_share(peerid, shnum, checkstring)
+        self._servermap.problems.append(f)
+
+
+    def _cache_good_sharedata(self, verinfo, shnum, now, data):
+        """
+        If one of my queries returns successfully (which means that we
+        were able to and successfully did validate the signature), I
+        cache the data that we initially fetched from the storage
+        server. This will help reduce the number of roundtrips that need
+        to occur when the file is downloaded, or when the file is
+        updated.
+        """
+        if verinfo:
+            self._node._add_to_cache(verinfo, shnum, 0, data)
+
+
     def _got_results(self, datavs, peerid, readsize, stuff, started):
         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
                       peerid=idlib.shortnodeid_b2a(peerid),
-                      numshares=len(datavs),
-                      level=log.NOISY)
+                      numshares=len(datavs))
         now = time.time()
         elapsed = now - started
-        self._queries_outstanding.discard(peerid)
-        self._servermap.reachable_peers.add(peerid)
-        self._must_query.discard(peerid)
-        self._queries_completed += 1
+        def _done_processing(ignored=None):
+            self._queries_outstanding.discard(peerid)
+            self._servermap.reachable_peers.add(peerid)
+            self._must_query.discard(peerid)
+            self._queries_completed += 1
         if not self._running:
-            self.log("but we're not running, so we'll ignore it", parent=lp,
-                     level=log.NOISY)
+            self.log("but we're not running, so we'll ignore it", parent=lp)
+            _done_processing()
             self._status.add_per_server_time(peerid, "late", started, elapsed)
             return
         self._status.add_per_server_time(peerid, "query", started, elapsed)
@@ -575,107 +658,209 @@ class ServermapUpdater:
         else:
             self._empty_peers.add(peerid)
 
-        last_verinfo = None
-        last_shnum = None
+        ss, storage_index = stuff
+        ds = []
+
         for shnum,datav in datavs.items():
             data = datav[0]
-            try:
-                verinfo = self._got_results_one_share(shnum, data, peerid, lp)
-                last_verinfo = verinfo
-                last_shnum = shnum
-                self._node._add_to_cache(verinfo, shnum, 0, data)
-            except CorruptShareError, e:
-                # log it and give the other shares a chance to be processed
-                f = failure.Failure()
-                self.log(format="bad share: %(f_value)s", f_value=str(f.value),
-                         failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
-                self.notify_server_corruption(peerid, shnum, str(e))
-                self._bad_peers.add(peerid)
-                self._last_failure = f
-                checkstring = data[:SIGNED_PREFIX_LENGTH]
-                self._servermap.mark_bad_share(peerid, shnum, checkstring)
-                self._servermap.problems.append(f)
-                pass
-
-        self._status.timings["cumulative_verify"] += (time.time() - now)
-
-        if self._need_privkey and last_verinfo:
-            # send them a request for the privkey. We send one request per
-            # server.
-            lp2 = self.log("sending privkey request",
-                           parent=lp, level=log.NOISY)
-            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
-             offsets_tuple) = last_verinfo
-            o = dict(offsets_tuple)
-
-            self._queries_outstanding.add(peerid)
-            readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
-            ss = self._servermap.connections[peerid]
-            privkey_started = time.time()
-            d = self._do_read(ss, peerid, self._storage_index,
-                              [last_shnum], readv)
-            d.addCallback(self._got_privkey_results, peerid, last_shnum,
-                          privkey_started, lp2)
-            d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
-            d.addErrback(log.err)
-            d.addCallback(self._check_for_done)
-            d.addErrback(self._fatal_error)
-
+            reader = MDMFSlotReadProxy(ss,
+                                       storage_index,
+                                       shnum,
+                                       data)
+            self._readers.setdefault(peerid, dict())[shnum] = reader
+            # our goal, with each response, is to validate the version
+            # information and share data as best we can at this point --
+            # we do this by validating the signature. To do this, we
+            # need to do the following:
+            #   - If we don't already have the public key, fetch the
+            #     public key. We use this to validate the signature.
+            if not self._node.get_pubkey():
+                # fetch and set the public key.
+                d = reader.get_verification_key(queue=True)
+                d.addCallback(lambda results, shnum=shnum, peerid=peerid:
+                    self._try_to_set_pubkey(results, peerid, shnum, lp))
+                # XXX: Make self._pubkey_query_failed?
+                d.addErrback(lambda error, shnum=shnum, peerid=peerid:
+                    self._got_corrupt_share(error, shnum, peerid, data, lp))
+            else:
+                # we already have the public key.
+                d = defer.succeed(None)
+
+            # Neither of these two branches return anything of
+            # consequence, so the first entry in our deferredlist will
+            # be None.
+
+            # - Next, we need the version information. We almost
+            #   certainly got this by reading the first thousand or so
+            #   bytes of the share on the storage server, so we
+            #   shouldn't need to fetch anything at this step.
+            d2 = reader.get_verinfo()
+            d2.addErrback(lambda error, shnum=shnum, peerid=peerid:
+                self._got_corrupt_share(error, shnum, peerid, data, lp))
+            # - Next, we need the signature. For an SDMF share, it is
+            #   likely that we fetched this when doing our initial fetch
+            #   to get the version information. In MDMF, this lives at
+            #   the end of the share, so unless the file is quite small,
+            #   we'll need to do a remote fetch to get it.
+            d3 = reader.get_signature(queue=True)
+            d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
+                self._got_corrupt_share(error, shnum, peerid, data, lp))
+            #  Once we have all three of these responses, we can move on
+            #  to validating the signature
+
+            # Does the node already have a privkey? If not, we'll try to
+            # fetch it here.
+            if self._need_privkey:
+                d4 = reader.get_encprivkey(queue=True)
+                d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
+                    self._try_to_validate_privkey(results, peerid, shnum, lp))
+                d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
+                    self._privkey_query_failed(error, shnum, data, lp))
+            else:
+                d4 = defer.succeed(None)
+
+
+            if self.fetch_update_data:
+                # fetch the block hash tree and first + last segment, as
+                # configured earlier.
+                # Then set them in wherever we happen to want to set
+                # them.
+                ds = []
+                # XXX: We do this above, too. Is there a good way to
+                # make the two routines share the value without
+                # introducing more roundtrips?
+                ds.append(reader.get_verinfo())
+                ds.append(reader.get_blockhashes(queue=True))
+                ds.append(reader.get_block_and_salt(self.start_segment,
+                                                    queue=True))
+                ds.append(reader.get_block_and_salt(self.end_segment,
+                                                    queue=True))
+                d5 = deferredutil.gatherResults(ds)
+                d5.addCallback(self._got_update_results_one_share, shnum)
+            else:
+                d5 = defer.succeed(None)
+
+            dl = defer.DeferredList([d, d2, d3, d4, d5])
+            dl.addBoth(self._turn_barrier)
+            reader.flush()
+            dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
+                self._got_signature_one_share(results, shnum, peerid, lp))
+            dl.addErrback(lambda error, shnum=shnum, data=data:
+               self._got_corrupt_share(error, shnum, peerid, data, lp))
+            dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
+                self._cache_good_sharedata(verinfo, shnum, now, data))
+            ds.append(dl)
+        # dl is a deferred list that will fire when all of the shares
+        # that we found on this peer are done processing. When dl fires,
+        # we know that processing is done, so we can decrement the
+        # semaphore-like thing that we incremented earlier.
+        dl = defer.DeferredList(ds, fireOnOneErrback=True)
+        # Are we done? Done means that there are no more queries to
+        # send, that there are no outstanding queries, and that we
+        # haven't received any queries that are still processing. If we
+        # are done, self._check_for_done will cause the done deferred
+        # that we returned to our caller to fire, which tells them that
+        # they have a complete servermap, and that we won't be touching
+        # the servermap anymore.
+        dl.addCallback(_done_processing)
+        dl.addCallback(self._check_for_done)
+        dl.addErrback(self._fatal_error)
         # all done!
         self.log("_got_results done", parent=lp, level=log.NOISY)
+        return dl
+
+
+    def _turn_barrier(self, result):
+        """
+        I help the servermap updater avoid the recursion limit issues
+        discussed in #237.
+        """
+        return fireEventually(result)
+
+
+    def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
+        if self._node.get_pubkey():
+            return # don't go through this again if we don't have to
+        fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
+        assert len(fingerprint) == 32
+        if fingerprint != self._node.get_fingerprint():
+            raise CorruptShareError(peerid, shnum,
+                                "pubkey doesn't match fingerprint")
+        self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
+        assert self._node.get_pubkey()
+
 
     def notify_server_corruption(self, peerid, shnum, reason):
         ss = self._servermap.connections[peerid]
         ss.callRemoteOnly("advise_corrupt_share",
                           "mutable", self._storage_index, shnum, reason)
 
-    def _got_results_one_share(self, shnum, data, peerid, lp):
+
+    def _got_signature_one_share(self, results, shnum, peerid, lp):
+        # It is our job to give versioninfo to our caller. We need to
+        # raise CorruptShareError if the share is corrupt for any
+        # reason, something that our caller will handle.
         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
                  shnum=shnum,
                  peerid=idlib.shortnodeid_b2a(peerid),
                  level=log.NOISY,
                  parent=lp)
-
-        # this might raise NeedMoreDataError, if the pubkey and signature
-        # live at some weird offset. That shouldn't happen, so I'm going to
-        # treat it as a bad share.
-        (seqnum, root_hash, IV, k, N, segsize, datalength,
-         pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
-
-        if not self._node.get_pubkey():
-            fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
-            assert len(fingerprint) == 32
-            if fingerprint != self._node.get_fingerprint():
-                raise CorruptShareError(peerid, shnum,
-                                        "pubkey doesn't match fingerprint")
-            self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
-
-        if self._need_privkey:
-            self._try_to_extract_privkey(data, peerid, shnum, lp)
-
-        (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
-         ig_segsize, ig_datalen, offsets) = unpack_header(data)
+        if not self._running:
+            # We can't process the results, since we can't touch the
+            # servermap anymore.
+            self.log("but we're not running anymore.")
+            return None
+
+        _, verinfo, signature, __, ___ = results
+        (seqnum,
+         root_hash,
+         saltish,
+         segsize,
+         datalen,
+         k,
+         n,
+         prefix,
+         offsets) = verinfo[1]
         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
 
-        verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+        # XXX: This should be done for us in the method, so
+        # presumably you can go in there and fix it.
+        verinfo = (seqnum,
+                   root_hash,
+                   saltish,
+                   segsize,
+                   datalen,
+                   k,
+                   n,
+                   prefix,
                    offsets_tuple)
+        # This tuple uniquely identifies a share on the grid; we use it
+        # to keep track of the ones that we've already seen.
 
         if verinfo not in self._valid_versions:
-            # it's a new pair. Verify the signature.
-            valid = self._node.get_pubkey().verify(prefix, signature)
+            # This is a new version tuple, and we need to validate it
+            # against the public key before keeping track of it.
+            assert self._node.get_pubkey()
+            valid = self._node.get_pubkey().verify(prefix, signature[1])
             if not valid:
-                raise CorruptShareError(peerid, shnum, "signature is invalid")
-
-            # ok, it's a valid verinfo. Add it to the list of validated
-            # versions.
-            self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
-                     % (seqnum, base32.b2a(root_hash)[:4],
-                        idlib.shortnodeid_b2a(peerid), shnum,
-                        k, N, segsize, datalength),
-                     parent=lp)
-            self._valid_versions.add(verinfo)
-        # We now know that this is a valid candidate verinfo.
-
+                raise CorruptShareError(peerid, shnum,
+                                        "signature is invalid")
+
+        # ok, it's a valid verinfo. Add it to the list of validated
+        # versions.
+        self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
+                 % (seqnum, base32.b2a(root_hash)[:4],
+                    idlib.shortnodeid_b2a(peerid), shnum,
+                    k, n, segsize, datalen),
+                    parent=lp)
+        self._valid_versions.add(verinfo)
+        # We now know that this is a valid candidate verinfo. Whether or
+        # not this instance of it is valid is a matter for the next
+        # statement; at this point, we just know that if we see this
+        # version info again, that its signature checks out and that
+        # we're okay to skip the signature-checking step.
+
+        # (peerid, shnum) are bound in the method invocation.
         if (peerid, shnum) in self._servermap.bad_shares:
             # we've been told that the rest of the data in this share is
             # unusable, so don't add it to the servermap.
@@ -688,43 +873,56 @@ class ServermapUpdater:
         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
         # and the versionmap
         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
+
         return verinfo
 
-    def _deserialize_pubkey(self, pubkey_s):
-        verifier = rsa.create_verifying_key_from_string(pubkey_s)
-        return verifier
 
-    def _try_to_extract_privkey(self, data, peerid, shnum, lp):
-        try:
-            r = unpack_share(data)
-        except NeedMoreDataError, e:
-            # this share won't help us. oh well.
-            offset = e.encprivkey_offset
-            length = e.encprivkey_length
-            self.log("shnum %d on peerid %s: share was too short (%dB) "
-                     "to get the encprivkey; [%d:%d] ought to hold it" %
-                     (shnum, idlib.shortnodeid_b2a(peerid), len(data),
-                      offset, offset+length),
-                     parent=lp)
-            # NOTE: if uncoordinated writes are taking place, someone might
-            # change the share (and most probably move the encprivkey) before
-            # we get a chance to do one of these reads and fetch it. This
-            # will cause us to see a NotEnoughSharesError(unable to fetch
-            # privkey) instead of an UncoordinatedWriteError . This is a
-            # nuisance, but it will go away when we move to DSA-based mutable
-            # files (since the privkey will be small enough to fit in the
-            # write cap).
+    def _got_update_results_one_share(self, results, share):
+        """
+        I record the update results in results.
+        """
+        assert len(results) == 4
+        verinfo, blockhashes, start, end = results
+        (seqnum,
+         root_hash,
+         saltish,
+         segsize,
+         datalen,
+         k,
+         n,
+         prefix,
+         offsets) = verinfo
+        offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
 
-            return
+        # XXX: This should be done for us in the method, so
+        # presumably you can go in there and fix it.
+        verinfo = (seqnum,
+                   root_hash,
+                   saltish,
+                   segsize,
+                   datalen,
+                   k,
+                   n,
+                   prefix,
+                   offsets_tuple)
 
-        (seqnum, root_hash, IV, k, N, segsize, datalen,
-         pubkey, signature, share_hash_chain, block_hash_tree,
-         share_data, enc_privkey) = r
+        update_data = (blockhashes, start, end)
+        self._servermap.set_update_data_for_share_and_verinfo(share,
+                                                              verinfo,
+                                                              update_data)
 
-        return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
 
-    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
+    def _deserialize_pubkey(self, pubkey_s):
+        verifier = rsa.create_verifying_key_from_string(pubkey_s)
+        return verifier
 
+
+    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
+        """
+        Given a writekey from a remote server, I validate it against the
+        writekey stored in my node. If it is valid, then I set the
+        privkey and encprivkey properties of the node.
+        """
         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
         if alleged_writekey != self._node.get_writekey():
@@ -797,20 +995,6 @@ class ServermapUpdater:
         self._queries_completed += 1
         self._last_failure = f
 
-    def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
-        now = time.time()
-        elapsed = now - started
-        self._status.add_per_server_time(peerid, "privkey", started, elapsed)
-        self._queries_outstanding.discard(peerid)
-        if not self._need_privkey:
-            return
-        if shnum not in datavs:
-            self.log("privkey wasn't there when we asked it",
-                     level=log.WEIRD, umid="VA9uDQ")
-            return
-        datav = datavs[shnum]
-        enc_privkey = datav[0]
-        self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
 
     def _privkey_query_failed(self, f, peerid, shnum, lp):
         self._queries_outstanding.discard(peerid)
@@ -825,12 +1009,12 @@ class ServermapUpdater:
         self._servermap.problems.append(f)
         self._last_failure = f
 
+
     def _check_for_done(self, res):
         # exit paths:
         #  return self._send_more_queries(outstanding) : send some more queries
         #  return self._done() : all done
         #  return : keep waiting, no new queries
-
         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
                               "%(outstanding)d queries outstanding, "
                               "%(extra)d extra peers available, "
@@ -1022,6 +1206,7 @@ class ServermapUpdater:
 
     def _done(self):
         if not self._running:
+            self.log("not running; we're already done")
             return
         self._running = False
         now = time.time()
@@ -1036,6 +1221,7 @@ class ServermapUpdater:
         self._servermap.last_update_time = self._started
         # the servermap will not be touched after this
         self.log("servermap: %s" % self._servermap.summarize_versions())
+
         eventually(self._done_deferred.callback, self._servermap)
 
     def _fatal_error(self, f):