From: Kevan Date: Sat, 14 Jan 2012 21:58:08 +0000 (-0800) Subject: Add _get_next_allocation method; remove duplicated logic from _loop. X-Git-Url: https://git.rkrishnan.org/simplejson/$rel_link?a=commitdiff_plain;h=4b7d966494c5f2fee962a9a237809bc3c10b68c7;p=tahoe-lafs%2Ftahoe-lafs.git Add _get_next_allocation method; remove duplicated logic from _loop. This is essentially a copy-and-paste of tracker and share selection logic from within _loop to _get_next_allocation. Now, _get_next_allocation is responsible for telling _loop what to do (i.e., which server to use, and which shares to store on that server), and _loop is responsible for doing what _get_next_allocation says to do. This isn't a big change right now, but allows us to move to an external peer selector object more easily (and obviously) in the future. This also unifies the log messages for success and failure, and the exception messages printed on a failure. Before, users would see two slightly different messages depending on where in _loop the allocation was noted as successful (or unsuccessful). This commit chooses the more verbose form of these messages to be used on all successes and failures. --- diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index de6db370..55a1aeb4 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -395,21 +395,20 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.full_count, self.error_count)) - def _loop(self): + def _get_next_allocation(self): + """ + Return the next share allocation that we need to make. + + Specifically, I return a tuple (tracker, shares_to_ask), where + tracker is a ServerTracker instance and shares_to_ask is a set of + shares that we should store on that server. If there are no more + allocations to make, I return None. + """ if not self.homeless_shares: merged = merge_servers(self.preexisting_shares, self.use_trackers) effective_happiness = servers_of_happiness(merged) if self.servers_of_happiness <= effective_happiness: - msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " - "self.use_trackers: %s, self.preexisting_shares: %s") \ - % (self, self._get_progress_message(), - pretty_print_shnum_to_servers(merged), - [', '.join([str_shareloc(k,v) - for k,v in st.buckets.iteritems()]) - for st in self.use_trackers], - pretty_print_shnum_to_servers(self.preexisting_shares)) - self.log(msg, level=log.OPERATIONAL) - return (self.use_trackers, self.preexisting_shares) + return None else: # We're not okay right now, but maybe we can fix it by # redistributing some shares. In cases where one or two @@ -444,23 +443,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): items.append((server, sharelist)) for writer in self.use_trackers: writer.abort_some_buckets(self.homeless_shares) - return self._loop() + return self._get_next_allocation() else: # Redistribution won't help us; fail. - server_count = len(self.serverids_with_shares) - failmsg = failure_message(server_count, - self.needed_shares, - self.servers_of_happiness, - effective_happiness) - servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s" - servmsg = servmsgtempl % ( - self, - failmsg, - self._get_progress_message(), - pretty_print_shnum_to_servers(merged) - ) - self.log(servmsg, level=log.INFREQUENT) - return self._failed("%s (%s)" % (failmsg, self._get_progress_message())) + return None if self.first_pass_trackers: tracker = self.first_pass_trackers.pop(0) @@ -468,18 +454,14 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): assert isinstance(tracker, ServerTracker) shares_to_ask = set(sorted(self.homeless_shares)[:1]) - self.homeless_shares -= shares_to_ask - self.query_count += 1 + next_tracker_list = self.second_pass_trackers self.num_servers_contacted += 1 if self._status: self._status.set_status("Contacting Servers [%s] (first query)," " %d shares left.." % (tracker.get_name(), len(self.homeless_shares))) - d = tracker.query(shares_to_ask) - d.addBoth(self._got_response, tracker, shares_to_ask, - self.second_pass_trackers) - return d + elif self.second_pass_trackers: # ask a server that we've already asked. if not self._started_second_pass: @@ -490,23 +472,37 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): len(self.second_pass_trackers)) tracker = self.second_pass_trackers.pop(0) shares_to_ask = set(sorted(self.homeless_shares)[:num_shares]) - self.homeless_shares -= shares_to_ask - self.query_count += 1 + next_tracker_list = self.next_pass_trackers if self._status: self._status.set_status("Contacting Servers [%s] (second query)," " %d shares left.." % (tracker.get_name(), len(self.homeless_shares))) - d = tracker.query(shares_to_ask) - d.addBoth(self._got_response, tracker, shares_to_ask, - self.next_pass_trackers) - return d + elif self.next_pass_trackers: # we've finished the second-or-later pass. Move all the remaining # servers back into self.second_pass_trackers for the next pass. self.second_pass_trackers.extend(self.next_pass_trackers) self.next_pass_trackers[:] = [] - return self._loop() + return self._get_next_allocation() + + else: + # nothing to do + return None + + self.homeless_shares -= shares_to_ask + self.query_count += 1 + return (tracker, shares_to_ask, next_tracker_list) + + + def _loop(self): + allocation = self._get_next_allocation() + if allocation is not None: + tracker, shares_to_ask, next_tracker_list = allocation + d = tracker.query(shares_to_ask) + d.addBoth(self._got_response, tracker, shares_to_ask, + next_tracker_list) + return d else: # no more servers. If we haven't placed enough shares, we fail. merged = merge_servers(self.preexisting_shares, self.use_trackers) @@ -516,8 +512,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.needed_shares, self.servers_of_happiness, effective_happiness) - msg = ("server selection failed for %s: %s (%s)" % - (self, msg, self._get_progress_message())) + msg = ("server selection failed for %s: %s (%s), merged=%s" % + (self, msg, self._get_progress_message(), + pretty_print_shnum_to_servers(merged))) if self.last_failure_msg: msg += " (%s)" % (self.last_failure_msg,) self.log(msg, level=log.UNUSUAL) @@ -526,8 +523,14 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # we placed enough to be happy, so we're done if self._status: self._status.set_status("Placed all shares") - msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, - self._get_progress_message(), pretty_print_shnum_to_servers(merged))) + msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " + "self.use_trackers: %s, self.preexisting_shares: %s") \ + % (self, self._get_progress_message(), + pretty_print_shnum_to_servers(merged), + [', '.join([str_shareloc(k,v) + for k,v in st.buckets.iteritems()]) + for st in self.use_trackers], + pretty_print_shnum_to_servers(self.preexisting_shares)) self.log(msg, level=log.OPERATIONAL) return (self.use_trackers, self.preexisting_shares)