From: Brian Warner Date: Sun, 27 Feb 2011 02:11:03 +0000 (-0700) Subject: upload.py: fix var names to avoid confusion between 'trackers' and 'servers' X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri//%22%22.?a=commitdiff_plain;h=0cf9e3b150d9d9175be0bb8cbc998b1bb10c109e;p=tahoe-lafs%2Ftahoe-lafs.git upload.py: fix var names to avoid confusion between 'trackers' and 'servers' --- diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 2758520b..cb7b4028 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -186,19 +186,13 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.needed_shares = needed_shares self.homeless_shares = set(range(total_shares)) - self.contacted_servers = [] # servers worth asking again - self.contacted_servers2 = [] # servers that we have asked again + self.contacted_trackers = [] # servers worth asking again + self.contacted_trackers2 = [] # servers that we have asked again self._started_second_pass = False - self.use_servers = set() # ServerTrackers that have shares assigned - # to them + self.use_trackers = set() # ServerTrackers that have shares assigned + # to them self.preexisting_shares = {} # shareid => set(serverids) holding shareid - # We don't try to allocate shares to these servers, since they've said - # that they're incapable of storing shares of the size that we'd want - # to store. We keep them around because they may have existing shares - # for this storage index, which we want to know about for accurate - # servers_of_happiness accounting - # (this is eventually a list, but it is initialized later) - self.readonly_servers = None + # These servers have shares -- any shares -- for our SI. We keep # track of these to write an error message with them later. self.servers_with_shares = set() @@ -251,25 +245,32 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): bucket_cancel_secret_hash(file_cancel_secret, serverid)) for (serverid, conn) in servers] - self.uncontacted_servers = _make_trackers(writable_servers) - self.readonly_servers = _make_trackers(readonly_servers) + self.uncontacted_trackers = _make_trackers(writable_servers) + + # We don't try to allocate shares to these servers, since they've + # said that they're incapable of storing shares of the size that we'd + # want to store. We ask them about existing shares for this storage + # index, which we want to know about for accurate + # servers_of_happiness accounting, then we forget about them. + readonly_trackers = _make_trackers(readonly_servers) + # We now ask servers that can't hold any new shares about existing # shares that they might have for our SI. Once this is done, we # start placing the shares that we haven't already accounted # for. ds = [] - if self._status and self.readonly_servers: + if self._status and readonly_trackers: self._status.set_status("Contacting readonly servers to find " "any existing shares") - for server in self.readonly_servers: - assert isinstance(server, ServerTracker) - d = server.ask_about_existing_shares() - d.addBoth(self._handle_existing_response, server.serverid) + for tracker in readonly_trackers: + assert isinstance(tracker, ServerTracker) + d = tracker.ask_about_existing_shares() + d.addBoth(self._handle_existing_response, tracker.serverid) ds.append(d) self.num_servers_contacted += 1 self.query_count += 1 self.log("asking server %s for any existing shares" % - (idlib.shortnodeid_b2a(server.serverid),), + (idlib.shortnodeid_b2a(tracker.serverid),), level=log.NOISY) dl = defer.DeferredList(ds) dl.addCallback(lambda ign: self._loop()) @@ -323,19 +324,19 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): def _loop(self): if not self.homeless_shares: - merged = merge_peers(self.preexisting_shares, self.use_servers) + merged = merge_peers(self.preexisting_shares, self.use_trackers) effective_happiness = servers_of_happiness(merged) if self.servers_of_happiness <= effective_happiness: msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " - "self.use_servers: %s, self.preexisting_shares: %s") \ + "self.use_trackers: %s, self.preexisting_shares: %s") \ % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join([str_shareloc(k,v) - for k,v in s.buckets.iteritems()]) - for s in self.use_servers], + for k,v in st.buckets.iteritems()]) + for st in self.use_trackers], pretty_print_shnum_to_servers(self.preexisting_shares)) self.log(msg, level=log.OPERATIONAL) - return (self.use_servers, self.preexisting_shares) + return (self.use_trackers, self.preexisting_shares) else: # We're not okay right now, but maybe we can fix it by # redistributing some shares. In cases where one or two @@ -352,7 +353,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): shares_to_spread = sum([len(list(sharelist)) - 1 for (server, sharelist) in shares.items()]) - if delta <= len(self.uncontacted_servers) and \ + if delta <= len(self.uncontacted_trackers) and \ shares_to_spread >= delta: items = shares.items() while len(self.homeless_shares) < delta: @@ -368,7 +369,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): if not self.preexisting_shares[share]: del self.preexisting_shares[share] items.append((server, sharelist)) - for writer in self.use_servers: + for writer in self.use_trackers: writer.abort_some_buckets(self.homeless_shares) return self._loop() else: @@ -388,10 +389,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.log(servmsg, level=log.INFREQUENT) return self._failed("%s (%s)" % (failmsg, self._get_progress_message())) - if self.uncontacted_servers: - server = self.uncontacted_servers.pop(0) + if self.uncontacted_trackers: + tracker = self.uncontacted_trackers.pop(0) # TODO: don't pre-convert all serverids to ServerTrackers - assert isinstance(server, ServerTracker) + assert isinstance(tracker, ServerTracker) shares_to_ask = set(sorted(self.homeless_shares)[:1]) self.homeless_shares -= shares_to_ask @@ -400,42 +401,42 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): if self._status: self._status.set_status("Contacting Servers [%s] (first query)," " %d shares left.." - % (idlib.shortnodeid_b2a(server.serverid), + % (idlib.shortnodeid_b2a(tracker.serverid), len(self.homeless_shares))) - d = server.query(shares_to_ask) - d.addBoth(self._got_response, server, shares_to_ask, - self.contacted_servers) + d = tracker.query(shares_to_ask) + d.addBoth(self._got_response, tracker, shares_to_ask, + self.contacted_trackers) return d - elif self.contacted_servers: + elif self.contacted_trackers: # ask a server that we've already asked. if not self._started_second_pass: self.log("starting second pass", level=log.NOISY) self._started_second_pass = True num_shares = mathutil.div_ceil(len(self.homeless_shares), - len(self.contacted_servers)) - server = self.contacted_servers.pop(0) + len(self.contacted_trackers)) + tracker = self.contacted_trackers.pop(0) shares_to_ask = set(sorted(self.homeless_shares)[:num_shares]) self.homeless_shares -= shares_to_ask self.query_count += 1 if self._status: self._status.set_status("Contacting Servers [%s] (second query)," " %d shares left.." - % (idlib.shortnodeid_b2a(server.serverid), + % (idlib.shortnodeid_b2a(tracker.serverid), len(self.homeless_shares))) - d = server.query(shares_to_ask) - d.addBoth(self._got_response, server, shares_to_ask, - self.contacted_servers2) + d = tracker.query(shares_to_ask) + d.addBoth(self._got_response, tracker, shares_to_ask, + self.contacted_trackers2) return d - elif self.contacted_servers2: + elif self.contacted_trackers2: # we've finished the second-or-later pass. Move all the remaining - # servers back into self.contacted_servers for the next pass. - self.contacted_servers.extend(self.contacted_servers2) - self.contacted_servers2[:] = [] + # servers back into self.contacted_trackers for the next pass. + self.contacted_trackers.extend(self.contacted_trackers2) + self.contacted_trackers2[:] = [] return self._loop() else: # no more servers. If we haven't placed enough shares, we fail. - merged = merge_peers(self.preexisting_shares, self.use_servers) + merged = merge_peers(self.preexisting_shares, self.use_trackers) effective_happiness = servers_of_happiness(merged) if effective_happiness < self.servers_of_happiness: msg = failure_message(len(self.servers_with_shares), @@ -455,20 +456,20 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged))) self.log(msg, level=log.OPERATIONAL) - return (self.use_servers, self.preexisting_shares) + return (self.use_trackers, self.preexisting_shares) - def _got_response(self, res, server, shares_to_ask, put_server_here): + def _got_response(self, res, tracker, shares_to_ask, put_tracker_here): if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. - self.log("%s got error during server selection: %s" % (server, res), + self.log("%s got error during server selection: %s" % (tracker, res), level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 self.homeless_shares |= shares_to_ask - if (self.uncontacted_servers - or self.contacted_servers - or self.contacted_servers2): + if (self.uncontacted_trackers + or self.contacted_trackers + or self.contacted_trackers2): # there is still hope, so just loop pass else: @@ -477,17 +478,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # failure we got: if a coding error causes all servers to fail # in the same way, this allows the common failure to be seen # by the uploader and should help with debugging - msg = ("last failure (from %s) was: %s" % (server, res)) + msg = ("last failure (from %s) was: %s" % (tracker, res)) self.last_failure_msg = msg else: (alreadygot, allocated) = res self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s" - % (idlib.shortnodeid_b2a(server.serverid), + % (idlib.shortnodeid_b2a(tracker.serverid), tuple(sorted(alreadygot)), tuple(sorted(allocated))), level=log.NOISY) progress = False for s in alreadygot: - self.preexisting_shares.setdefault(s, set()).add(server.serverid) + self.preexisting_shares.setdefault(s, set()).add(tracker.serverid) if s in self.homeless_shares: self.homeless_shares.remove(s) progress = True @@ -497,11 +498,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # the ServerTracker will remember which shares were allocated on # that peer. We just have to remember to use them. if allocated: - self.use_servers.add(server) + self.use_trackers.add(tracker) progress = True if allocated or alreadygot: - self.servers_with_shares.add(server.serverid) + self.servers_with_shares.add(tracker.serverid) not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) @@ -532,7 +533,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): else: # if they *were* able to accept everything, they might be # willing to accept even more. - put_server_here.append(server) + put_tracker_here.append(tracker) # now loop return self._loop() @@ -545,11 +546,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): place shares for this file. I then raise an UploadUnhappinessError with my msg argument. """ - for server in self.use_servers: - assert isinstance(server, ServerTracker) - - server.abort() - + for tracker in self.use_trackers: + assert isinstance(tracker, ServerTracker) + tracker.abort() raise UploadUnhappinessError(msg)