upload.py: fix var names to avoid confusion between 'trackers' and 'servers'
authorBrian Warner <warner@lothar.com>
Sun, 27 Feb 2011 02:11:03 +0000 (19:11 -0700)
committerBrian Warner <warner@lothar.com>
Sun, 27 Feb 2011 02:11:03 +0000 (19:11 -0700)
src/allmydata/immutable/upload.py

index 2758520b092e0c0e7b5015877ba1aa35329184a2..cb7b40280ea709e2f11c8f314838d9aed01c6000 100644 (file)
@@ -186,19 +186,13 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         self.needed_shares = needed_shares
 
         self.homeless_shares = set(range(total_shares))
-        self.contacted_servers = [] # servers worth asking again
-        self.contacted_servers2 = [] # servers that we have asked again
+        self.contacted_trackers = [] # servers worth asking again
+        self.contacted_trackers2 = [] # servers that we have asked again
         self._started_second_pass = False
-        self.use_servers = set() # ServerTrackers that have shares assigned
-                                 # to them
+        self.use_trackers = set() # ServerTrackers that have shares assigned
+                                  # to them
         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
-        # We don't try to allocate shares to these servers, since they've said
-        # that they're incapable of storing shares of the size that we'd want
-        # to store. We keep them around because they may have existing shares
-        # for this storage index, which we want to know about for accurate
-        # servers_of_happiness accounting
-        # (this is eventually a list, but it is initialized later)
-        self.readonly_servers = None
+
         # These servers have shares -- any shares -- for our SI. We keep
         # track of these to write an error message with them later.
         self.servers_with_shares = set()
@@ -251,25 +245,32 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                                  bucket_cancel_secret_hash(file_cancel_secret,
                                                            serverid))
                    for (serverid, conn) in servers]
-        self.uncontacted_servers = _make_trackers(writable_servers)
-        self.readonly_servers = _make_trackers(readonly_servers)
+        self.uncontacted_trackers = _make_trackers(writable_servers)
+
+        # We don't try to allocate shares to these servers, since they've
+        # said that they're incapable of storing shares of the size that we'd
+        # want to store. We ask them about existing shares for this storage
+        # index, which we want to know about for accurate
+        # servers_of_happiness accounting, then we forget about them.
+        readonly_trackers = _make_trackers(readonly_servers)
+
         # We now ask servers that can't hold any new shares about existing
         # shares that they might have for our SI. Once this is done, we
         # start placing the shares that we haven't already accounted
         # for.
         ds = []
-        if self._status and self.readonly_servers:
+        if self._status and readonly_trackers:
             self._status.set_status("Contacting readonly servers to find "
                                     "any existing shares")
-        for server in self.readonly_servers:
-            assert isinstance(server, ServerTracker)
-            d = server.ask_about_existing_shares()
-            d.addBoth(self._handle_existing_response, server.serverid)
+        for tracker in readonly_trackers:
+            assert isinstance(tracker, ServerTracker)
+            d = tracker.ask_about_existing_shares()
+            d.addBoth(self._handle_existing_response, tracker.serverid)
             ds.append(d)
             self.num_servers_contacted += 1
             self.query_count += 1
             self.log("asking server %s for any existing shares" %
-                     (idlib.shortnodeid_b2a(server.serverid),),
+                     (idlib.shortnodeid_b2a(tracker.serverid),),
                     level=log.NOISY)
         dl = defer.DeferredList(ds)
         dl.addCallback(lambda ign: self._loop())
@@ -323,19 +324,19 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
 
     def _loop(self):
         if not self.homeless_shares:
-            merged = merge_peers(self.preexisting_shares, self.use_servers)
+            merged = merge_peers(self.preexisting_shares, self.use_trackers)
             effective_happiness = servers_of_happiness(merged)
             if self.servers_of_happiness <= effective_happiness:
                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
-                       "self.use_servers: %s, self.preexisting_shares: %s") \
+                       "self.use_trackers: %s, self.preexisting_shares: %s") \
                        % (self, self._get_progress_message(),
                           pretty_print_shnum_to_servers(merged),
                           [', '.join([str_shareloc(k,v)
-                                      for k,v in s.buckets.iteritems()])
-                           for s in self.use_servers],
+                                      for k,v in st.buckets.iteritems()])
+                           for st in self.use_trackers],
                           pretty_print_shnum_to_servers(self.preexisting_shares))
                 self.log(msg, level=log.OPERATIONAL)
-                return (self.use_servers, self.preexisting_shares)
+                return (self.use_trackers, self.preexisting_shares)
             else:
                 # We're not okay right now, but maybe we can fix it by
                 # redistributing some shares. In cases where one or two
@@ -352,7 +353,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 shares_to_spread = sum([len(list(sharelist)) - 1
                                         for (server, sharelist)
                                         in shares.items()])
-                if delta <= len(self.uncontacted_servers) and \
+                if delta <= len(self.uncontacted_trackers) and \
                    shares_to_spread >= delta:
                     items = shares.items()
                     while len(self.homeless_shares) < delta:
@@ -368,7 +369,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                             if not self.preexisting_shares[share]:
                                 del self.preexisting_shares[share]
                             items.append((server, sharelist))
-                        for writer in self.use_servers:
+                        for writer in self.use_trackers:
                             writer.abort_some_buckets(self.homeless_shares)
                     return self._loop()
                 else:
@@ -388,10 +389,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                     self.log(servmsg, level=log.INFREQUENT)
                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
 
-        if self.uncontacted_servers:
-            server = self.uncontacted_servers.pop(0)
+        if self.uncontacted_trackers:
+            tracker = self.uncontacted_trackers.pop(0)
             # TODO: don't pre-convert all serverids to ServerTrackers
-            assert isinstance(server, ServerTracker)
+            assert isinstance(tracker, ServerTracker)
 
             shares_to_ask = set(sorted(self.homeless_shares)[:1])
             self.homeless_shares -= shares_to_ask
@@ -400,42 +401,42 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             if self._status:
                 self._status.set_status("Contacting Servers [%s] (first query),"
                                         " %d shares left.."
-                                        % (idlib.shortnodeid_b2a(server.serverid),
+                                        % (idlib.shortnodeid_b2a(tracker.serverid),
                                            len(self.homeless_shares)))
-            d = server.query(shares_to_ask)
-            d.addBoth(self._got_response, server, shares_to_ask,
-                      self.contacted_servers)
+            d = tracker.query(shares_to_ask)
+            d.addBoth(self._got_response, tracker, shares_to_ask,
+                      self.contacted_trackers)
             return d
-        elif self.contacted_servers:
+        elif self.contacted_trackers:
             # ask a server that we've already asked.
             if not self._started_second_pass:
                 self.log("starting second pass",
                         level=log.NOISY)
                 self._started_second_pass = True
             num_shares = mathutil.div_ceil(len(self.homeless_shares),
-                                           len(self.contacted_servers))
-            server = self.contacted_servers.pop(0)
+                                           len(self.contacted_trackers))
+            tracker = self.contacted_trackers.pop(0)
             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
             self.homeless_shares -= shares_to_ask
             self.query_count += 1
             if self._status:
                 self._status.set_status("Contacting Servers [%s] (second query),"
                                         " %d shares left.."
-                                        % (idlib.shortnodeid_b2a(server.serverid),
+                                        % (idlib.shortnodeid_b2a(tracker.serverid),
                                            len(self.homeless_shares)))
-            d = server.query(shares_to_ask)
-            d.addBoth(self._got_response, server, shares_to_ask,
-                      self.contacted_servers2)
+            d = tracker.query(shares_to_ask)
+            d.addBoth(self._got_response, tracker, shares_to_ask,
+                      self.contacted_trackers2)
             return d
-        elif self.contacted_servers2:
+        elif self.contacted_trackers2:
             # we've finished the second-or-later pass. Move all the remaining
-            # servers back into self.contacted_servers for the next pass.
-            self.contacted_servers.extend(self.contacted_servers2)
-            self.contacted_servers2[:] = []
+            # servers back into self.contacted_trackers for the next pass.
+            self.contacted_trackers.extend(self.contacted_trackers2)
+            self.contacted_trackers2[:] = []
             return self._loop()
         else:
             # no more servers. If we haven't placed enough shares, we fail.
-            merged = merge_peers(self.preexisting_shares, self.use_servers)
+            merged = merge_peers(self.preexisting_shares, self.use_trackers)
             effective_happiness = servers_of_happiness(merged)
             if effective_happiness < self.servers_of_happiness:
                 msg = failure_message(len(self.servers_with_shares),
@@ -455,20 +456,20 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
                 self.log(msg, level=log.OPERATIONAL)
-                return (self.use_servers, self.preexisting_shares)
+                return (self.use_trackers, self.preexisting_shares)
 
-    def _got_response(self, res, server, shares_to_ask, put_server_here):
+    def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
         if isinstance(res, failure.Failure):
             # This is unusual, and probably indicates a bug or a network
             # problem.
-            self.log("%s got error during server selection: %s" % (server, res),
+            self.log("%s got error during server selection: %s" % (tracker, res),
                     level=log.UNUSUAL)
             self.error_count += 1
             self.bad_query_count += 1
             self.homeless_shares |= shares_to_ask
-            if (self.uncontacted_servers
-                or self.contacted_servers
-                or self.contacted_servers2):
+            if (self.uncontacted_trackers
+                or self.contacted_trackers
+                or self.contacted_trackers2):
                 # there is still hope, so just loop
                 pass
             else:
@@ -477,17 +478,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 # failure we got: if a coding error causes all servers to fail
                 # in the same way, this allows the common failure to be seen
                 # by the uploader and should help with debugging
-                msg = ("last failure (from %s) was: %s" % (server, res))
+                msg = ("last failure (from %s) was: %s" % (tracker, res))
                 self.last_failure_msg = msg
         else:
             (alreadygot, allocated) = res
             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
-                    % (idlib.shortnodeid_b2a(server.serverid),
+                    % (idlib.shortnodeid_b2a(tracker.serverid),
                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
                     level=log.NOISY)
             progress = False
             for s in alreadygot:
-                self.preexisting_shares.setdefault(s, set()).add(server.serverid)
+                self.preexisting_shares.setdefault(s, set()).add(tracker.serverid)
                 if s in self.homeless_shares:
                     self.homeless_shares.remove(s)
                     progress = True
@@ -497,11 +498,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             # the ServerTracker will remember which shares were allocated on
             # that peer. We just have to remember to use them.
             if allocated:
-                self.use_servers.add(server)
+                self.use_trackers.add(tracker)
                 progress = True
 
             if allocated or alreadygot:
-                self.servers_with_shares.add(server.serverid)
+                self.servers_with_shares.add(tracker.serverid)
 
             not_yet_present = set(shares_to_ask) - set(alreadygot)
             still_homeless = not_yet_present - set(allocated)
@@ -532,7 +533,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             else:
                 # if they *were* able to accept everything, they might be
                 # willing to accept even more.
-                put_server_here.append(server)
+                put_tracker_here.append(tracker)
 
         # now loop
         return self._loop()
@@ -545,11 +546,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         place shares for this file. I then raise an
         UploadUnhappinessError with my msg argument.
         """
-        for server in self.use_servers:
-            assert isinstance(server, ServerTracker)
-
-            server.abort()
-
+        for tracker in self.use_trackers:
+            assert isinstance(tracker, ServerTracker)
+            tracker.abort()
         raise UploadUnhappinessError(msg)