From: Kevan Carstensen Date: Sun, 7 Aug 2011 00:42:59 +0000 (-0700) Subject: mutable/servermap: Rework the servermap to work with MDMF mutable files X-Git-Tag: trac-5200~16 X-Git-Url: https://git.rkrishnan.org/maximilian?a=commitdiff_plain;h=bb10d685ed86eb0870bbd09469f912043e405711;p=tahoe-lafs%2Ftahoe-lafs.git mutable/servermap: Rework the servermap to work with MDMF mutable files --- diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index c69e4108..cb93fc5d 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -4,17 +4,17 @@ from zope.interface import implements from itertools import count from twisted.internet import defer from twisted.python import failure -from foolscap.api import DeadReferenceError, RemoteException, eventually -from allmydata.util import base32, hashutil, idlib, log +from foolscap.api import DeadReferenceError, RemoteException, eventually, \ + fireEventually +from allmydata.util import base32, hashutil, idlib, log, deferredutil from allmydata.util.dictutil import DictOfSets from allmydata.storage.server import si_b2a from allmydata.interfaces import IServermapUpdaterStatus from pycryptopp.publickey import rsa from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ - CorruptShareError, NeedMoreDataError -from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \ - SIGNED_PREFIX_LENGTH + CorruptShareError +from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy class UpdateStatus: implements(IServermapUpdaterStatus) @@ -121,6 +121,7 @@ class ServerMap: self.bad_shares = {} # maps (peerid,shnum) to old checkstring self.last_update_mode = None self.last_update_time = 0 + self.update_data = {} # (verinfo,shnum) => data def copy(self): s = ServerMap() @@ -251,7 +252,6 @@ class ServerMap: """Return a set of versionids, one for each version that is currently recoverable.""" versionmap = self.make_versionmap() - recoverable_versions = set() for (verinfo, shares) in versionmap.items(): (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, @@ -337,9 +337,25 @@ class ServerMap: return False + def get_update_data_for_share_and_verinfo(self, shnum, verinfo): + """ + I return the update data for the given shnum + """ + update_data = self.update_data[shnum] + update_datum = [i[1] for i in update_data if i[0] == verinfo][0] + return update_datum + + + def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data): + """ + I record the block hash tree for the given shnum. + """ + self.update_data.setdefault(shnum , []).append((verinfo, data)) + + class ServermapUpdater: def __init__(self, filenode, storage_broker, monitor, servermap, - mode=MODE_READ, add_lease=False): + mode=MODE_READ, add_lease=False, update_range=None): """I update a servermap, locating a sufficient number of useful shares and remembering where they are located. @@ -364,6 +380,7 @@ class ServermapUpdater: self._servers_responded = set() # how much data should we read? + # SDMF: # * if we only need the checkstring, then [0:75] # * if we need to validate the checkstring sig, then [543ish:799ish] # * if we need the verification key, then [107:436ish] @@ -371,21 +388,38 @@ class ServermapUpdater: # * if we need the encrypted private key, we want [-1216ish:] # * but we can't read from negative offsets # * the offset table tells us the 'ish', also the positive offset - # A future version of the SMDF slot format should consider using - # fixed-size slots so we can retrieve less data. For now, we'll just - # read 4000 bytes, which also happens to read enough actual data to - # pre-fetch an 18-entry dirnode. + # MDMF: + # * Checkstring? [0:72] + # * If we want to validate the checkstring, then [0:72], [143:?] -- + # the offset table will tell us for sure. + # * If we need the verification key, we have to consult the offset + # table as well. + # At this point, we don't know which we are. Our filenode can + # tell us, but it might be lying -- in some cases, we're + # responsible for telling it which kind of file it is. self._read_size = 4000 if mode == MODE_CHECK: # we use unpack_prefix_and_signature, so we need 1k self._read_size = 1000 self._need_privkey = False + if mode == MODE_WRITE and not self._node.get_privkey(): self._need_privkey = True # check+repair: repair requires the privkey, so if we didn't happen # to ask for it during the check, we'll have problems doing the # publish. + self.fetch_update_data = False + if mode == MODE_WRITE and update_range: + # We're updating the servermap in preparation for an + # in-place file update, so we need to fetch some additional + # data from each share that we find. + assert len(update_range) == 2 + + self.start_segment = update_range[0] + self.end_segment = update_range[1] + self.fetch_update_data = True + prefix = si_b2a(self._storage_index)[:5] self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)", si=prefix, mode=mode) @@ -424,6 +458,7 @@ class ServermapUpdater: self._queries_completed = 0 sb = self._storage_broker + # All of the peers, permuted by the storage index, as usual. full_peerlist = [(s.get_serverid(), s.get_rref()) for s in sb.get_servers_for_psi(self._storage_index)] self.full_peerlist = full_peerlist # for use later, immutable @@ -431,8 +466,11 @@ class ServermapUpdater: self._good_peers = set() # peers who had some shares self._empty_peers = set() # peers who don't have any shares self._bad_peers = set() # peers to whom our queries failed + self._readers = {} # peerid -> dict(sharewriters), filled in + # after responses come in. k = self._node.get_required_shares() + # For what cases can these conditions work? if k is None: # make a guess k = 3 @@ -445,6 +483,7 @@ class ServermapUpdater: self.num_peers_to_query = k + self.EPSILON if self.mode == MODE_CHECK: + # We want to query all of the peers. initial_peers_to_query = dict(full_peerlist) must_query = set(initial_peers_to_query.keys()) self.extra_peers = [] @@ -452,6 +491,7 @@ class ServermapUpdater: # we're planning to replace all the shares, so we want a good # chance of finding them all. We will keep searching until we've # seen epsilon that don't have a share. + # We don't query all of the peers because that could take a while. self.num_peers_to_query = N + self.EPSILON initial_peers_to_query, must_query = self._build_initial_querylist() self.required_num_empty_peers = self.EPSILON @@ -461,7 +501,8 @@ class ServermapUpdater: # might also avoid the round trip required to read the encrypted # private key. - else: + else: # MODE_READ, MODE_ANYTHING + # 2k peers is good enough. initial_peers_to_query, must_query = self._build_initial_querylist() # this is a set of peers that we are required to get responses from: @@ -476,6 +517,9 @@ class ServermapUpdater: # before we can consider ourselves finished, and self.extra_peers # contains the overflow (peers that we should tap if we don't get # enough responses) + # I guess that self._must_query is a subset of + # initial_peers_to_query? + assert set(must_query).issubset(set(initial_peers_to_query)) self._send_initial_requests(initial_peers_to_query) self._status.timings["initial_queries"] = time.time() - self._started @@ -532,8 +576,8 @@ class ServermapUpdater: # errors that aren't handled by _query_failed (and errors caused by # _query_failed) get logged, but we still want to check for doneness. d.addErrback(log.err) - d.addBoth(self._check_for_done) d.addErrback(self._fatal_error) + d.addCallback(self._check_for_done) return d def _do_read(self, ss, peerid, storage_index, shnums, readv): @@ -552,20 +596,59 @@ class ServermapUpdater: d = ss.callRemote("slot_readv", storage_index, shnums, readv) return d + + def _got_corrupt_share(self, e, shnum, peerid, data, lp): + """ + I am called when a remote server returns a corrupt share in + response to one of our queries. By corrupt, I mean a share + without a valid signature. I then record the failure, notify the + server of the corruption, and record the share as bad. + """ + f = failure.Failure(e) + self.log(format="bad share: %(f_value)s", f_value=str(f), + failure=f, parent=lp, level=log.WEIRD, umid="h5llHg") + # Notify the server that its share is corrupt. + self.notify_server_corruption(peerid, shnum, str(e)) + # By flagging this as a bad peer, we won't count any of + # the other shares on that peer as valid, though if we + # happen to find a valid version string amongst those + # shares, we'll keep track of it so that we don't need + # to validate the signature on those again. + self._bad_peers.add(peerid) + self._last_failure = f + # XXX: Use the reader for this? + checkstring = data[:SIGNED_PREFIX_LENGTH] + self._servermap.mark_bad_share(peerid, shnum, checkstring) + self._servermap.problems.append(f) + + + def _cache_good_sharedata(self, verinfo, shnum, now, data): + """ + If one of my queries returns successfully (which means that we + were able to and successfully did validate the signature), I + cache the data that we initially fetched from the storage + server. This will help reduce the number of roundtrips that need + to occur when the file is downloaded, or when the file is + updated. + """ + if verinfo: + self._node._add_to_cache(verinfo, shnum, 0, data) + + def _got_results(self, datavs, peerid, readsize, stuff, started): lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares", peerid=idlib.shortnodeid_b2a(peerid), - numshares=len(datavs), - level=log.NOISY) + numshares=len(datavs)) now = time.time() elapsed = now - started - self._queries_outstanding.discard(peerid) - self._servermap.reachable_peers.add(peerid) - self._must_query.discard(peerid) - self._queries_completed += 1 + def _done_processing(ignored=None): + self._queries_outstanding.discard(peerid) + self._servermap.reachable_peers.add(peerid) + self._must_query.discard(peerid) + self._queries_completed += 1 if not self._running: - self.log("but we're not running, so we'll ignore it", parent=lp, - level=log.NOISY) + self.log("but we're not running, so we'll ignore it", parent=lp) + _done_processing() self._status.add_per_server_time(peerid, "late", started, elapsed) return self._status.add_per_server_time(peerid, "query", started, elapsed) @@ -575,107 +658,209 @@ class ServermapUpdater: else: self._empty_peers.add(peerid) - last_verinfo = None - last_shnum = None + ss, storage_index = stuff + ds = [] + for shnum,datav in datavs.items(): data = datav[0] - try: - verinfo = self._got_results_one_share(shnum, data, peerid, lp) - last_verinfo = verinfo - last_shnum = shnum - self._node._add_to_cache(verinfo, shnum, 0, data) - except CorruptShareError, e: - # log it and give the other shares a chance to be processed - f = failure.Failure() - self.log(format="bad share: %(f_value)s", f_value=str(f.value), - failure=f, parent=lp, level=log.WEIRD, umid="h5llHg") - self.notify_server_corruption(peerid, shnum, str(e)) - self._bad_peers.add(peerid) - self._last_failure = f - checkstring = data[:SIGNED_PREFIX_LENGTH] - self._servermap.mark_bad_share(peerid, shnum, checkstring) - self._servermap.problems.append(f) - pass - - self._status.timings["cumulative_verify"] += (time.time() - now) - - if self._need_privkey and last_verinfo: - # send them a request for the privkey. We send one request per - # server. - lp2 = self.log("sending privkey request", - parent=lp, level=log.NOISY) - (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, - offsets_tuple) = last_verinfo - o = dict(offsets_tuple) - - self._queries_outstanding.add(peerid) - readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ] - ss = self._servermap.connections[peerid] - privkey_started = time.time() - d = self._do_read(ss, peerid, self._storage_index, - [last_shnum], readv) - d.addCallback(self._got_privkey_results, peerid, last_shnum, - privkey_started, lp2) - d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2) - d.addErrback(log.err) - d.addCallback(self._check_for_done) - d.addErrback(self._fatal_error) - + reader = MDMFSlotReadProxy(ss, + storage_index, + shnum, + data) + self._readers.setdefault(peerid, dict())[shnum] = reader + # our goal, with each response, is to validate the version + # information and share data as best we can at this point -- + # we do this by validating the signature. To do this, we + # need to do the following: + # - If we don't already have the public key, fetch the + # public key. We use this to validate the signature. + if not self._node.get_pubkey(): + # fetch and set the public key. + d = reader.get_verification_key(queue=True) + d.addCallback(lambda results, shnum=shnum, peerid=peerid: + self._try_to_set_pubkey(results, peerid, shnum, lp)) + # XXX: Make self._pubkey_query_failed? + d.addErrback(lambda error, shnum=shnum, peerid=peerid: + self._got_corrupt_share(error, shnum, peerid, data, lp)) + else: + # we already have the public key. + d = defer.succeed(None) + + # Neither of these two branches return anything of + # consequence, so the first entry in our deferredlist will + # be None. + + # - Next, we need the version information. We almost + # certainly got this by reading the first thousand or so + # bytes of the share on the storage server, so we + # shouldn't need to fetch anything at this step. + d2 = reader.get_verinfo() + d2.addErrback(lambda error, shnum=shnum, peerid=peerid: + self._got_corrupt_share(error, shnum, peerid, data, lp)) + # - Next, we need the signature. For an SDMF share, it is + # likely that we fetched this when doing our initial fetch + # to get the version information. In MDMF, this lives at + # the end of the share, so unless the file is quite small, + # we'll need to do a remote fetch to get it. + d3 = reader.get_signature(queue=True) + d3.addErrback(lambda error, shnum=shnum, peerid=peerid: + self._got_corrupt_share(error, shnum, peerid, data, lp)) + # Once we have all three of these responses, we can move on + # to validating the signature + + # Does the node already have a privkey? If not, we'll try to + # fetch it here. + if self._need_privkey: + d4 = reader.get_encprivkey(queue=True) + d4.addCallback(lambda results, shnum=shnum, peerid=peerid: + self._try_to_validate_privkey(results, peerid, shnum, lp)) + d4.addErrback(lambda error, shnum=shnum, peerid=peerid: + self._privkey_query_failed(error, shnum, data, lp)) + else: + d4 = defer.succeed(None) + + + if self.fetch_update_data: + # fetch the block hash tree and first + last segment, as + # configured earlier. + # Then set them in wherever we happen to want to set + # them. + ds = [] + # XXX: We do this above, too. Is there a good way to + # make the two routines share the value without + # introducing more roundtrips? + ds.append(reader.get_verinfo()) + ds.append(reader.get_blockhashes(queue=True)) + ds.append(reader.get_block_and_salt(self.start_segment, + queue=True)) + ds.append(reader.get_block_and_salt(self.end_segment, + queue=True)) + d5 = deferredutil.gatherResults(ds) + d5.addCallback(self._got_update_results_one_share, shnum) + else: + d5 = defer.succeed(None) + + dl = defer.DeferredList([d, d2, d3, d4, d5]) + dl.addBoth(self._turn_barrier) + reader.flush() + dl.addCallback(lambda results, shnum=shnum, peerid=peerid: + self._got_signature_one_share(results, shnum, peerid, lp)) + dl.addErrback(lambda error, shnum=shnum, data=data: + self._got_corrupt_share(error, shnum, peerid, data, lp)) + dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data: + self._cache_good_sharedata(verinfo, shnum, now, data)) + ds.append(dl) + # dl is a deferred list that will fire when all of the shares + # that we found on this peer are done processing. When dl fires, + # we know that processing is done, so we can decrement the + # semaphore-like thing that we incremented earlier. + dl = defer.DeferredList(ds, fireOnOneErrback=True) + # Are we done? Done means that there are no more queries to + # send, that there are no outstanding queries, and that we + # haven't received any queries that are still processing. If we + # are done, self._check_for_done will cause the done deferred + # that we returned to our caller to fire, which tells them that + # they have a complete servermap, and that we won't be touching + # the servermap anymore. + dl.addCallback(_done_processing) + dl.addCallback(self._check_for_done) + dl.addErrback(self._fatal_error) # all done! self.log("_got_results done", parent=lp, level=log.NOISY) + return dl + + + def _turn_barrier(self, result): + """ + I help the servermap updater avoid the recursion limit issues + discussed in #237. + """ + return fireEventually(result) + + + def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp): + if self._node.get_pubkey(): + return # don't go through this again if we don't have to + fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) + assert len(fingerprint) == 32 + if fingerprint != self._node.get_fingerprint(): + raise CorruptShareError(peerid, shnum, + "pubkey doesn't match fingerprint") + self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s)) + assert self._node.get_pubkey() + def notify_server_corruption(self, peerid, shnum, reason): ss = self._servermap.connections[peerid] ss.callRemoteOnly("advise_corrupt_share", "mutable", self._storage_index, shnum, reason) - def _got_results_one_share(self, shnum, data, peerid, lp): + + def _got_signature_one_share(self, results, shnum, peerid, lp): + # It is our job to give versioninfo to our caller. We need to + # raise CorruptShareError if the share is corrupt for any + # reason, something that our caller will handle. self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s", shnum=shnum, peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, parent=lp) - - # this might raise NeedMoreDataError, if the pubkey and signature - # live at some weird offset. That shouldn't happen, so I'm going to - # treat it as a bad share. - (seqnum, root_hash, IV, k, N, segsize, datalength, - pubkey_s, signature, prefix) = unpack_prefix_and_signature(data) - - if not self._node.get_pubkey(): - fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) - assert len(fingerprint) == 32 - if fingerprint != self._node.get_fingerprint(): - raise CorruptShareError(peerid, shnum, - "pubkey doesn't match fingerprint") - self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s)) - - if self._need_privkey: - self._try_to_extract_privkey(data, peerid, shnum, lp) - - (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N, - ig_segsize, ig_datalen, offsets) = unpack_header(data) + if not self._running: + # We can't process the results, since we can't touch the + # servermap anymore. + self.log("but we're not running anymore.") + return None + + _, verinfo, signature, __, ___ = results + (seqnum, + root_hash, + saltish, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo[1] offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) - verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, + # XXX: This should be done for us in the method, so + # presumably you can go in there and fix it. + verinfo = (seqnum, + root_hash, + saltish, + segsize, + datalen, + k, + n, + prefix, offsets_tuple) + # This tuple uniquely identifies a share on the grid; we use it + # to keep track of the ones that we've already seen. if verinfo not in self._valid_versions: - # it's a new pair. Verify the signature. - valid = self._node.get_pubkey().verify(prefix, signature) + # This is a new version tuple, and we need to validate it + # against the public key before keeping track of it. + assert self._node.get_pubkey() + valid = self._node.get_pubkey().verify(prefix, signature[1]) if not valid: - raise CorruptShareError(peerid, shnum, "signature is invalid") - - # ok, it's a valid verinfo. Add it to the list of validated - # versions. - self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d" - % (seqnum, base32.b2a(root_hash)[:4], - idlib.shortnodeid_b2a(peerid), shnum, - k, N, segsize, datalength), - parent=lp) - self._valid_versions.add(verinfo) - # We now know that this is a valid candidate verinfo. - + raise CorruptShareError(peerid, shnum, + "signature is invalid") + + # ok, it's a valid verinfo. Add it to the list of validated + # versions. + self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d" + % (seqnum, base32.b2a(root_hash)[:4], + idlib.shortnodeid_b2a(peerid), shnum, + k, n, segsize, datalen), + parent=lp) + self._valid_versions.add(verinfo) + # We now know that this is a valid candidate verinfo. Whether or + # not this instance of it is valid is a matter for the next + # statement; at this point, we just know that if we see this + # version info again, that its signature checks out and that + # we're okay to skip the signature-checking step. + + # (peerid, shnum) are bound in the method invocation. if (peerid, shnum) in self._servermap.bad_shares: # we've been told that the rest of the data in this share is # unusable, so don't add it to the servermap. @@ -688,43 +873,56 @@ class ServermapUpdater: self._servermap.add_new_share(peerid, shnum, verinfo, timestamp) # and the versionmap self.versionmap.add(verinfo, (shnum, peerid, timestamp)) + return verinfo - def _deserialize_pubkey(self, pubkey_s): - verifier = rsa.create_verifying_key_from_string(pubkey_s) - return verifier - def _try_to_extract_privkey(self, data, peerid, shnum, lp): - try: - r = unpack_share(data) - except NeedMoreDataError, e: - # this share won't help us. oh well. - offset = e.encprivkey_offset - length = e.encprivkey_length - self.log("shnum %d on peerid %s: share was too short (%dB) " - "to get the encprivkey; [%d:%d] ought to hold it" % - (shnum, idlib.shortnodeid_b2a(peerid), len(data), - offset, offset+length), - parent=lp) - # NOTE: if uncoordinated writes are taking place, someone might - # change the share (and most probably move the encprivkey) before - # we get a chance to do one of these reads and fetch it. This - # will cause us to see a NotEnoughSharesError(unable to fetch - # privkey) instead of an UncoordinatedWriteError . This is a - # nuisance, but it will go away when we move to DSA-based mutable - # files (since the privkey will be small enough to fit in the - # write cap). + def _got_update_results_one_share(self, results, share): + """ + I record the update results in results. + """ + assert len(results) == 4 + verinfo, blockhashes, start, end = results + (seqnum, + root_hash, + saltish, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo + offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) - return + # XXX: This should be done for us in the method, so + # presumably you can go in there and fix it. + verinfo = (seqnum, + root_hash, + saltish, + segsize, + datalen, + k, + n, + prefix, + offsets_tuple) - (seqnum, root_hash, IV, k, N, segsize, datalen, - pubkey, signature, share_hash_chain, block_hash_tree, - share_data, enc_privkey) = r + update_data = (blockhashes, start, end) + self._servermap.set_update_data_for_share_and_verinfo(share, + verinfo, + update_data) - return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp) - def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp): + def _deserialize_pubkey(self, pubkey_s): + verifier = rsa.create_verifying_key_from_string(pubkey_s) + return verifier + + def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp): + """ + Given a writekey from a remote server, I validate it against the + writekey stored in my node. If it is valid, then I set the + privkey and encprivkey properties of the node. + """ alleged_privkey_s = self._node._decrypt_privkey(enc_privkey) alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) if alleged_writekey != self._node.get_writekey(): @@ -797,20 +995,6 @@ class ServermapUpdater: self._queries_completed += 1 self._last_failure = f - def _got_privkey_results(self, datavs, peerid, shnum, started, lp): - now = time.time() - elapsed = now - started - self._status.add_per_server_time(peerid, "privkey", started, elapsed) - self._queries_outstanding.discard(peerid) - if not self._need_privkey: - return - if shnum not in datavs: - self.log("privkey wasn't there when we asked it", - level=log.WEIRD, umid="VA9uDQ") - return - datav = datavs[shnum] - enc_privkey = datav[0] - self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp) def _privkey_query_failed(self, f, peerid, shnum, lp): self._queries_outstanding.discard(peerid) @@ -825,12 +1009,12 @@ class ServermapUpdater: self._servermap.problems.append(f) self._last_failure = f + def _check_for_done(self, res): # exit paths: # return self._send_more_queries(outstanding) : send some more queries # return self._done() : all done # return : keep waiting, no new queries - lp = self.log(format=("_check_for_done, mode is '%(mode)s', " "%(outstanding)d queries outstanding, " "%(extra)d extra peers available, " @@ -1022,6 +1206,7 @@ class ServermapUpdater: def _done(self): if not self._running: + self.log("not running; we're already done") return self._running = False now = time.time() @@ -1036,6 +1221,7 @@ class ServermapUpdater: self._servermap.last_update_time = self._started # the servermap will not be touched after this self.log("servermap: %s" % self._servermap.summarize_versions()) + eventually(self._done_deferred.callback, self._servermap) def _fatal_error(self, f):