]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
upload.py: apply David-Sarah's advice rename (un)contacted(2) trackers to first_pass...
authorZooko O'Whielacronx <zooko@zooko.com>
Mon, 1 Aug 2011 17:41:43 +0000 (10:41 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Mon, 1 Aug 2011 17:41:43 +0000 (10:41 -0700)
This patch was written by Brian but was re-recorded by Zooko (with David-Sarah looking on) to use darcs replace instead of editing to rename the three variables to their new names.
refs #1363

src/allmydata/immutable/upload.py

index aee3e2404191fb2db414bd2ae713ad745c7c4efc..199dcb2ea05bff2833da70b349ecf38724306503 100644 (file)
@@ -173,11 +173,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                          num_segments, total_shares, needed_shares,
                          servers_of_happiness):
         """
-        @return: (upload_trackers, already_servers), where upload_trackers is
-                 a set of ServerTracker instances that have agreed to hold
+        @return: (upload_trackers, already_serverids), where upload_trackers
+                 is a set of ServerTracker instances that have agreed to hold
                  some shares for us (the shareids are stashed inside the
-                 ServerTracker), and already_servers is a dict mapping shnum
-                 to a set of serverids which claim to already have the share.
+                 ServerTracker), and already_serverids is a dict mapping
+                 shnum to a set of serverids for servers which claim to
+                 already have the share.
         """
 
         if self._status:
@@ -188,9 +189,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         self.needed_shares = needed_shares
 
         self.homeless_shares = set(range(total_shares))
-        self.contacted_trackers = [] # servers worth asking again
-        self.contacted_trackers2 = [] # servers that we have asked again
-        self._started_second_pass = False
         self.use_trackers = set() # ServerTrackers that have shares assigned
                                   # to them
         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
@@ -249,7 +247,21 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                                    renew, cancel)
                 trackers.append(st)
             return trackers
-        self.uncontacted_trackers = _make_trackers(writable_servers)
+
+        # We assign each servers/trackers into one three lists. They all
+        # start in the "first pass" list. During the first pass, as we ask
+        # each one to hold a share, we move their tracker to the "second
+        # pass" list, until the first-pass list is empty. Then during the
+        # second pass, as we ask each to hold more shares, we move their
+        # tracker to the "next pass" list, until the second-pass list is
+        # empty. Then we move everybody from the next-pass list back to the
+        # second-pass list and repeat the "second" pass (really the third,
+        # fourth, etc pass), until all shares are assigned, or we've run out
+        # of potential servers.
+        self.first_pass_trackers = _make_trackers(writable_servers)
+        self.second_pass_trackers = [] # servers worth asking again
+        self.next_pass_trackers = [] # servers that we have asked again
+        self._started_second_pass = False
 
         # We don't try to allocate shares to these servers, since they've
         # said that they're incapable of storing shares of the size that we'd
@@ -356,7 +368,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 shares_to_spread = sum([len(list(sharelist)) - 1
                                         for (server, sharelist)
                                         in shares.items()])
-                if delta <= len(self.uncontacted_trackers) and \
+                if delta <= len(self.first_pass_trackers) and \
                    shares_to_spread >= delta:
                     items = shares.items()
                     while len(self.homeless_shares) < delta:
@@ -392,8 +404,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                     self.log(servmsg, level=log.INFREQUENT)
                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
 
-        if self.uncontacted_trackers:
-            tracker = self.uncontacted_trackers.pop(0)
+        if self.first_pass_trackers:
+            tracker = self.first_pass_trackers.pop(0)
             # TODO: don't pre-convert all serverids to ServerTrackers
             assert isinstance(tracker, ServerTracker)
 
@@ -408,17 +420,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                                            len(self.homeless_shares)))
             d = tracker.query(shares_to_ask)
             d.addBoth(self._got_response, tracker, shares_to_ask,
-                      self.contacted_trackers)
+                      self.second_pass_trackers)
             return d
-        elif self.contacted_trackers:
+        elif self.second_pass_trackers:
             # ask a server that we've already asked.
             if not self._started_second_pass:
                 self.log("starting second pass",
                         level=log.NOISY)
                 self._started_second_pass = True
             num_shares = mathutil.div_ceil(len(self.homeless_shares),
-                                           len(self.contacted_trackers))
-            tracker = self.contacted_trackers.pop(0)
+                                           len(self.second_pass_trackers))
+            tracker = self.second_pass_trackers.pop(0)
             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
             self.homeless_shares -= shares_to_ask
             self.query_count += 1
@@ -429,13 +441,13 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                                            len(self.homeless_shares)))
             d = tracker.query(shares_to_ask)
             d.addBoth(self._got_response, tracker, shares_to_ask,
-                      self.contacted_trackers2)
+                      self.next_pass_trackers)
             return d
-        elif self.contacted_trackers2:
+        elif self.next_pass_trackers:
             # we've finished the second-or-later pass. Move all the remaining
-            # servers back into self.contacted_trackers for the next pass.
-            self.contacted_trackers.extend(self.contacted_trackers2)
-            self.contacted_trackers2[:] = []
+            # servers back into self.second_pass_trackers for the next pass.
+            self.second_pass_trackers.extend(self.next_pass_trackers)
+            self.next_pass_trackers[:] = []
             return self._loop()
         else:
             # no more servers. If we haven't placed enough shares, we fail.
@@ -470,9 +482,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             self.error_count += 1
             self.bad_query_count += 1
             self.homeless_shares |= shares_to_ask
-            if (self.uncontacted_trackers
-                or self.contacted_trackers
-                or self.contacted_trackers2):
+            if (self.first_pass_trackers
+                or self.second_pass_trackers
+                or self.next_pass_trackers):
                 # there is still hope, so just loop
                 pass
             else:
@@ -923,27 +935,29 @@ class CHKUploader:
         d.addCallback(_done)
         return d
 
-    def set_shareholders(self, (upload_trackers, already_servers), encoder):
+    def set_shareholders(self, (upload_trackers, already_serverids), encoder):
         """
         @param upload_trackers: a sequence of ServerTracker objects that
                                 have agreed to hold some shares for us (the
                                 shareids are stashed inside the ServerTracker)
-        @paran already_servers: a dict mapping sharenum to a set of serverids
-                                that claim to already have this share
+
+        @paran already_serverids: a dict mapping sharenum to a set of
+                                  serverids for servers that claim to already
+                                  have this share
         """
-        msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s"
+        msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
         values = ([', '.join([str_shareloc(k,v)
                               for k,v in st.buckets.iteritems()])
-                   for st in upload_trackers], already_servers)
+                   for st in upload_trackers], already_serverids)
         self.log(msgtempl % values, level=log.OPERATIONAL)
         # record already-present shares in self._results
-        self._results.preexisting_shares = len(already_servers)
+        self._results.preexisting_shares = len(already_serverids)
 
         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
         for tracker in upload_trackers:
             assert isinstance(tracker, ServerTracker)
         buckets = {}
-        servermap = already_servers.copy()
+        servermap = already_serverids.copy()
         for tracker in upload_trackers:
             buckets.update(tracker.buckets)
             for shnum in tracker.buckets: