]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
upload.py: more tracker-vs-server cleanup
authorBrian Warner <warner@lothar.com>
Sun, 27 Feb 2011 02:11:07 +0000 (19:11 -0700)
committerBrian Warner <warner@lothar.com>
Sun, 27 Feb 2011 02:11:07 +0000 (19:11 -0700)
src/allmydata/immutable/upload.py

index cb7b40280ea709e2f11c8f314838d9aed01c6000..b36a435ce67ed7ec70e35dfd32327c9f10cf15d0 100644 (file)
@@ -171,11 +171,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                          num_segments, total_shares, needed_shares,
                          servers_of_happiness):
         """
-        @return: (upload_servers, already_servers), where upload_servers is
+        @return: (upload_trackers, already_servers), where upload_trackers is
                  a set of ServerTracker instances that have agreed to hold
                  some shares for us (the shareids are stashed inside the
                  ServerTracker), and already_servers is a dict mapping shnum
-                 to a set of servers which claim to already have the share.
+                 to a set of serverids which claim to already have the share.
         """
 
         if self._status:
@@ -195,7 +195,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
 
         # These servers have shares -- any shares -- for our SI. We keep
         # track of these to write an error message with them later.
-        self.servers_with_shares = set()
+        self.serverids_with_shares = set()
 
         # this needed_hashes computation should mirror
         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
@@ -277,26 +277,26 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         return dl
 
 
-    def _handle_existing_response(self, res, server):
+    def _handle_existing_response(self, res, serverid):
         """
         I handle responses to the queries sent by
         Tahoe2ServerSelector._existing_shares.
         """
         if isinstance(res, failure.Failure):
             self.log("%s got error during existing shares check: %s"
-                    % (idlib.shortnodeid_b2a(server), res),
+                    % (idlib.shortnodeid_b2a(serverid), res),
                     level=log.UNUSUAL)
             self.error_count += 1
             self.bad_query_count += 1
         else:
             buckets = res
             if buckets:
-                self.servers_with_shares.add(server)
+                self.serverids_with_shares.add(serverid)
             self.log("response to get_buckets() from server %s: alreadygot=%s"
-                    % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))),
+                    % (idlib.shortnodeid_b2a(serverid), tuple(sorted(buckets))),
                     level=log.NOISY)
             for bucket in buckets:
-                self.preexisting_shares.setdefault(bucket, set()).add(server)
+                self.preexisting_shares.setdefault(bucket, set()).add(serverid)
                 self.homeless_shares.discard(bucket)
             self.full_count += 1
             self.bad_query_count += 1
@@ -374,7 +374,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                     return self._loop()
                 else:
                     # Redistribution won't help us; fail.
-                    server_count = len(self.servers_with_shares)
+                    server_count = len(self.serverids_with_shares)
                     failmsg = failure_message(server_count,
                                               self.needed_shares,
                                               self.servers_of_happiness,
@@ -439,7 +439,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             merged = merge_peers(self.preexisting_shares, self.use_trackers)
             effective_happiness = servers_of_happiness(merged)
             if effective_happiness < self.servers_of_happiness:
-                msg = failure_message(len(self.servers_with_shares),
+                msg = failure_message(len(self.serverids_with_shares),
                                       self.needed_shares,
                                       self.servers_of_happiness,
                                       effective_happiness)
@@ -502,7 +502,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 progress = True
 
             if allocated or alreadygot:
-                self.servers_with_shares.add(tracker.serverid)
+                self.serverids_with_shares.add(tracker.serverid)
 
             not_yet_present = set(shares_to_ask) - set(alreadygot)
             still_homeless = not_yet_present - set(allocated)
@@ -920,38 +920,39 @@ class CHKUploader:
         d.addCallback(_done)
         return d
 
-    def set_shareholders(self, (upload_servers, already_servers), encoder):
+    def set_shareholders(self, (upload_trackers, already_servers), encoder):
         """
-        @param upload_servers: a sequence of ServerTracker objects that
-                               have agreed to hold some shares for us (the
-                               shareids are stashed inside the ServerTracker)
+        @param upload_trackers: a sequence of ServerTracker objects that
+                                have agreed to hold some shares for us (the
+                                shareids are stashed inside the ServerTracker)
         @paran already_servers: a dict mapping sharenum to a set of serverids
                                 that claim to already have this share
         """
-        msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s"
-        values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()])
-            for s in upload_servers], already_servers)
+        msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s"
+        values = ([', '.join([str_shareloc(k,v)
+                              for k,v in st.buckets.iteritems()])
+                   for st in upload_trackers], already_servers)
         self.log(msgtempl % values, level=log.OPERATIONAL)
         # record already-present shares in self._results
         self._results.preexisting_shares = len(already_servers)
 
         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
-        for server in upload_servers:
-            assert isinstance(server, ServerTracker)
+        for tracker in upload_trackers:
+            assert isinstance(tracker, ServerTracker)
         buckets = {}
         servermap = already_servers.copy()
-        for server in upload_servers:
-            buckets.update(server.buckets)
-            for shnum in server.buckets:
-                self._server_trackers[shnum] = server
-                servermap.setdefault(shnum, set()).add(server.serverid)
-        assert len(buckets) == sum([len(server.buckets)
-                                    for server in upload_servers]), \
+        for tracker in upload_trackers:
+            buckets.update(tracker.buckets)
+            for shnum in tracker.buckets:
+                self._server_trackers[shnum] = tracker
+                servermap.setdefault(shnum, set()).add(tracker.serverid)
+        assert len(buckets) == sum([len(tracker.buckets)
+                                    for tracker in upload_trackers]), \
             "%s (%s) != %s (%s)" % (
                 len(buckets),
                 buckets,
-                sum([len(server.buckets) for server in upload_servers]),
-                [(s.buckets, s.serverid) for s in upload_servers]
+                sum([len(tracker.buckets) for tracker in upload_trackers]),
+                [(t.buckets, t.serverid) for t in upload_trackers]
                 )
         encoder.set_shareholders(buckets, servermap)