From 880758340fb827f678c4108b83b04259ad3124b4 Mon Sep 17 00:00:00 2001 From: Zooko O'Whielacronx Date: Mon, 1 Aug 2011 10:41:43 -0700 Subject: [PATCH] upload.py: apply David-Sarah's advice rename (un)contacted(2) trackers to first_pass/second_pass/next_pass This patch was written by Brian but was re-recorded by Zooko (with David-Sarah looking on) to use darcs replace instead of editing to rename the three variables to their new names. refs #1363 --- src/allmydata/immutable/upload.py | 74 ++++++++++++++++++------------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index aee3e240..199dcb2e 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -173,11 +173,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): num_segments, total_shares, needed_shares, servers_of_happiness): """ - @return: (upload_trackers, already_servers), where upload_trackers is - a set of ServerTracker instances that have agreed to hold + @return: (upload_trackers, already_serverids), where upload_trackers + is a set of ServerTracker instances that have agreed to hold some shares for us (the shareids are stashed inside the - ServerTracker), and already_servers is a dict mapping shnum - to a set of serverids which claim to already have the share. + ServerTracker), and already_serverids is a dict mapping + shnum to a set of serverids for servers which claim to + already have the share. """ if self._status: @@ -188,9 +189,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.needed_shares = needed_shares self.homeless_shares = set(range(total_shares)) - self.contacted_trackers = [] # servers worth asking again - self.contacted_trackers2 = [] # servers that we have asked again - self._started_second_pass = False self.use_trackers = set() # ServerTrackers that have shares assigned # to them self.preexisting_shares = {} # shareid => set(serverids) holding shareid @@ -249,7 +247,21 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): renew, cancel) trackers.append(st) return trackers - self.uncontacted_trackers = _make_trackers(writable_servers) + + # We assign each servers/trackers into one three lists. They all + # start in the "first pass" list. During the first pass, as we ask + # each one to hold a share, we move their tracker to the "second + # pass" list, until the first-pass list is empty. Then during the + # second pass, as we ask each to hold more shares, we move their + # tracker to the "next pass" list, until the second-pass list is + # empty. Then we move everybody from the next-pass list back to the + # second-pass list and repeat the "second" pass (really the third, + # fourth, etc pass), until all shares are assigned, or we've run out + # of potential servers. + self.first_pass_trackers = _make_trackers(writable_servers) + self.second_pass_trackers = [] # servers worth asking again + self.next_pass_trackers = [] # servers that we have asked again + self._started_second_pass = False # We don't try to allocate shares to these servers, since they've # said that they're incapable of storing shares of the size that we'd @@ -356,7 +368,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): shares_to_spread = sum([len(list(sharelist)) - 1 for (server, sharelist) in shares.items()]) - if delta <= len(self.uncontacted_trackers) and \ + if delta <= len(self.first_pass_trackers) and \ shares_to_spread >= delta: items = shares.items() while len(self.homeless_shares) < delta: @@ -392,8 +404,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.log(servmsg, level=log.INFREQUENT) return self._failed("%s (%s)" % (failmsg, self._get_progress_message())) - if self.uncontacted_trackers: - tracker = self.uncontacted_trackers.pop(0) + if self.first_pass_trackers: + tracker = self.first_pass_trackers.pop(0) # TODO: don't pre-convert all serverids to ServerTrackers assert isinstance(tracker, ServerTracker) @@ -408,17 +420,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): len(self.homeless_shares))) d = tracker.query(shares_to_ask) d.addBoth(self._got_response, tracker, shares_to_ask, - self.contacted_trackers) + self.second_pass_trackers) return d - elif self.contacted_trackers: + elif self.second_pass_trackers: # ask a server that we've already asked. if not self._started_second_pass: self.log("starting second pass", level=log.NOISY) self._started_second_pass = True num_shares = mathutil.div_ceil(len(self.homeless_shares), - len(self.contacted_trackers)) - tracker = self.contacted_trackers.pop(0) + len(self.second_pass_trackers)) + tracker = self.second_pass_trackers.pop(0) shares_to_ask = set(sorted(self.homeless_shares)[:num_shares]) self.homeless_shares -= shares_to_ask self.query_count += 1 @@ -429,13 +441,13 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): len(self.homeless_shares))) d = tracker.query(shares_to_ask) d.addBoth(self._got_response, tracker, shares_to_ask, - self.contacted_trackers2) + self.next_pass_trackers) return d - elif self.contacted_trackers2: + elif self.next_pass_trackers: # we've finished the second-or-later pass. Move all the remaining - # servers back into self.contacted_trackers for the next pass. - self.contacted_trackers.extend(self.contacted_trackers2) - self.contacted_trackers2[:] = [] + # servers back into self.second_pass_trackers for the next pass. + self.second_pass_trackers.extend(self.next_pass_trackers) + self.next_pass_trackers[:] = [] return self._loop() else: # no more servers. If we haven't placed enough shares, we fail. @@ -470,9 +482,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.error_count += 1 self.bad_query_count += 1 self.homeless_shares |= shares_to_ask - if (self.uncontacted_trackers - or self.contacted_trackers - or self.contacted_trackers2): + if (self.first_pass_trackers + or self.second_pass_trackers + or self.next_pass_trackers): # there is still hope, so just loop pass else: @@ -923,27 +935,29 @@ class CHKUploader: d.addCallback(_done) return d - def set_shareholders(self, (upload_trackers, already_servers), encoder): + def set_shareholders(self, (upload_trackers, already_serverids), encoder): """ @param upload_trackers: a sequence of ServerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the ServerTracker) - @paran already_servers: a dict mapping sharenum to a set of serverids - that claim to already have this share + + @paran already_serverids: a dict mapping sharenum to a set of + serverids for servers that claim to already + have this share """ - msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s" + msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s" values = ([', '.join([str_shareloc(k,v) for k,v in st.buckets.iteritems()]) - for st in upload_trackers], already_servers) + for st in upload_trackers], already_serverids) self.log(msgtempl % values, level=log.OPERATIONAL) # record already-present shares in self._results - self._results.preexisting_shares = len(already_servers) + self._results.preexisting_shares = len(already_serverids) self._server_trackers = {} # k: shnum, v: instance of ServerTracker for tracker in upload_trackers: assert isinstance(tracker, ServerTracker) buckets = {} - servermap = already_servers.copy() + servermap = already_serverids.copy() for tracker in upload_trackers: buckets.update(tracker.buckets) for shnum in tracker.buckets: -- 2.37.2