]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Eliminate overcounting iof servers_of_happiness in Tahoe2PeerSelector; also reorganiz...
authorKevan Carstensen <kevan@isnotajoke.com>
Wed, 18 Nov 2009 02:45:42 +0000 (19:45 -0700)
committerKevan Carstensen <kevan@isnotajoke.com>
Wed, 18 Nov 2009 02:45:42 +0000 (19:45 -0700)
src/allmydata/immutable/upload.py

index 3477ed5f8b21ee54deeb19271a805c5a051bcb87..1e8289ec641a11023e8fe0fda954ceddc6f4ca40 100644 (file)
@@ -138,8 +138,21 @@ class PeerTracker:
         return (alreadygot, set(b.keys()))
 
 def servers_with_unique_shares(existing_shares, used_peers=None):
+    """
+    I accept a dict of shareid -> peerid mappings (and optionally a list
+    of PeerTracker instances) and return a list of servers that have shares.
+    """
     servers = []
+    existing_shares = existing_shares.copy()
     if used_peers:
+        peerdict = {}
+        for peer in used_peers:
+            peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
+        for k in peerdict.keys():
+            if existing_shares.has_key(k):
+                # Prevent overcounting; favor the bucket, and not the 
+                # prexisting share.
+                del(existing_shares[k])
         peers = list(used_peers.copy())
         # We do this because the preexisting shares list goes by peerid.
         peers = [x.peerid for x in peers]
@@ -148,12 +161,29 @@ def servers_with_unique_shares(existing_shares, used_peers=None):
     return list(set(servers))
 
 def shares_by_server(existing_shares):
+    """
+    I accept a dict of shareid -> peerid mappings, and return a dict
+    of peerid -> shareid mappings
+    """
     servers = {}
     for server in set(existing_shares.values()):
         servers[server] = set([x for x in existing_shares.keys()
                                if existing_shares[x] == server])
     return servers
 
+def should_add_server(existing_shares, server, bucket):
+    """
+    I tell my caller whether the servers_of_happiness number will be
+    increased or decreased if a particular server is added as the peer
+    already holding a particular share. I take a dictionary, a peerid,
+    and a bucket as arguments, and return a boolean.
+    """
+    old_size = len(servers_with_unique_shares(existing_shares))
+    new_candidate = existing_shares.copy()
+    new_candidate[bucket] = server
+    new_size = len(servers_with_unique_shares(new_candidate))
+    return old_size < new_size
+
 class Tahoe2PeerSelector:
 
     def __init__(self, upload_id, logparent=None, upload_status=None):
@@ -261,14 +291,15 @@ class Tahoe2PeerSelector:
             peer = self.readonly_peers.pop()
             assert isinstance(peer, PeerTracker)
             d = peer.query_allocated()
-            d.addCallback(self._handle_allocate_response)
+            d.addCallback(self._handle_existing_response)
             return d
 
-    def _handle_allocate_response(self, (peer, buckets)):
+    def _handle_existing_response(self, (peer, buckets)):
         for bucket in buckets:
-            self.preexisting_shares[bucket] = peer
-            if self.homeless_shares:
-                self.homeless_shares.remove(bucket)
+            if should_add_server(self.preexisting_shares, peer, bucket):
+                self.preexisting_shares[bucket] = peer
+                if self.homeless_shares and bucket in self.homeless_shares:
+                    self.homeless_shares.remove(bucket)
         return self._existing_shares()
 
     def _loop(self):
@@ -312,10 +343,10 @@ class Tahoe2PeerSelector:
                             items.append((servernum, sharelist))
                     return self._loop()
                 else:
-                    raise NotEnoughSharesError("shares could only be placed on %d "
-                                            "servers (%d were requested)" %
-                                            (len(effective_happiness),
-                                             self.servers_of_happiness))
+                    raise NotEnoughSharesError("shares could only be placed "
+                                   "on %d servers (%d were requested)" %
+                                   (len(effective_happiness),
+                                   self.servers_of_happiness))
 
         if self.uncontacted_peers:
             peer = self.uncontacted_peers.pop(0)
@@ -391,7 +422,7 @@ class Tahoe2PeerSelector:
                 # we placed enough to be happy, so we're done
                 if self._status:
                     self._status.set_status("Placed all shares")
-                return self.use_peers
+                return (self.use_peers, self.preexisting_shares)
 
     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
         if isinstance(res, failure.Failure):
@@ -422,16 +453,12 @@ class Tahoe2PeerSelector:
                     level=log.NOISY, parent=self._log_parent)
             progress = False
             for s in alreadygot:
-                if self.preexisting_shares.has_key(s):
-                    old_size = len(servers_with_unique_shares(self.preexisting_shares))
-                    new_candidate = self.preexisting_shares.copy()
-                    new_candidate[s] = peer.peerid
-                    new_size = len(servers_with_unique_shares(new_candidate))
-                    if old_size >= new_size: continue
-                self.preexisting_shares[s] = peer.peerid
-                if s in self.homeless_shares:
-                    self.homeless_shares.remove(s)
-                    progress = True
+                if should_add_server(self.preexisting_shares,
+                                     peer.peerid, s):
+                    self.preexisting_shares[s] = peer.peerid
+                    if s in self.homeless_shares:
+                        self.homeless_shares.remove(s)
+                        progress = True
 
             # the PeerTracker will remember which shares were allocated on
             # that peer. We just have to remember to use them.