From 808f85158989ebc7cbcfdb6fd2716c9d9239ddf2 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Sun, 16 Sep 2007 17:08:34 -0700
Subject: [PATCH] upload: make peer-selection a bit more uniform. Closes #132.

---
 src/allmydata/test/test_upload.py | 33 ++++++++++++++++++++++----
 src/allmydata/upload.py           | 39 +++++++++++++++++--------------
 2 files changed, 51 insertions(+), 21 deletions(-)

diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py
index a1c32284..eaf2f52b 100644
--- a/src/allmydata/test/test_upload.py
+++ b/src/allmydata/test/test_upload.py
@@ -132,11 +132,12 @@ class FakeBucketWriter:
         self.closed = True
 
 class FakeClient:
-    def __init__(self, mode="good"):
+    def __init__(self, mode="good", num_servers=50):
         self.mode = mode
+        self.num_servers = num_servers
     def get_permuted_peers(self, storage_index, include_myself):
         peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
-                  for fakeid in range(50) ]
+                  for fakeid in range(self.num_servers) ]
         self.last_peers = [p[2] for p in peers]
         return peers
     def get_push_to_ourselves(self):
@@ -276,8 +277,9 @@ class FullServer(unittest.TestCase):
         return d
 
 class PeerSelection(unittest.TestCase):
-    def setUp(self):
-        self.node = FakeClient(mode="good")
+
+    def make_client(self, num_servers=50):
+        self.node = FakeClient(mode="good", num_servers=num_servers)
         self.u = upload.Uploader()
         self.u.running = True
         self.u.parent = self.node
@@ -298,6 +300,7 @@ class PeerSelection(unittest.TestCase):
         # if we have 50 shares, and there are 50 peers, and they all accept a
         # share, we should get exactly one share per peer
 
+        self.make_client()
         data = self.get_data(SIZE_LARGE)
         self.u.DEFAULT_ENCODING_PARAMETERS = (25, 30, 50)
         d = self.u.upload_data(data)
@@ -314,6 +317,7 @@ class PeerSelection(unittest.TestCase):
         # if we have 100 shares, and there are 50 peers, and they all accept
         # all shares, we should get exactly two shares per peer
 
+        self.make_client()
         data = self.get_data(SIZE_LARGE)
         self.u.DEFAULT_ENCODING_PARAMETERS = (50, 75, 100)
         d = self.u.upload_data(data)
@@ -330,6 +334,7 @@ class PeerSelection(unittest.TestCase):
         # if we have 51 shares, and there are 50 peers, then one peer gets
         # two shares and the rest get just one
 
+        self.make_client()
         data = self.get_data(SIZE_LARGE)
         self.u.DEFAULT_ENCODING_PARAMETERS = (24, 41, 51)
         d = self.u.upload_data(data)
@@ -356,6 +361,7 @@ class PeerSelection(unittest.TestCase):
         # 4 shares. The design goal is to accomplish this with only two
         # queries per peer.
 
+        self.make_client()
         data = self.get_data(SIZE_LARGE)
         self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200)
         d = self.u.upload_data(data)
@@ -368,6 +374,25 @@ class PeerSelection(unittest.TestCase):
         d.addCallback(_check)
         return d
 
+    def test_three_of_ten(self):
+        # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
+        # 4+4+2
+
+        self.make_client(3)
+        data = self.get_data(SIZE_LARGE)
+        self.u.DEFAULT_ENCODING_PARAMETERS = (3, 5, 10)
+        d = self.u.upload_data(data)
+        d.addCallback(self._check_large, SIZE_LARGE)
+        def _check(res):
+            counts = {}
+            for p in self.node.last_peers:
+                allocated = p.ss.allocated
+                counts[len(allocated)] = counts.get(len(allocated), 0) + 1
+            histogram = [counts.get(i, 0) for i in range(5)]
+            self.failUnlessEqual(histogram, [0,0,0,2,1])
+        d.addCallback(_check)
+        return d
+
 
 # TODO:
 #  upload with exactly 75 peers (shares_of_happiness)
diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py
index f01ce7dc..db95901f 100644
--- a/src/allmydata/upload.py
+++ b/src/allmydata/upload.py
@@ -124,7 +124,8 @@ class Tahoe2PeerSelector:
 
         self.homeless_shares = range(total_shares)
         # self.uncontacted_peers = list() # peers we haven't asked yet
-        self.contacted_peers = ["start"] # peers worth asking again
+        self.contacted_peers = [] # peers worth asking again
+        self.contacted_peers2 = [] # peers that we have asked again
         self.use_peers = set() # PeerTrackers that have shares assigned to them
         self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
 
@@ -189,25 +190,27 @@ class Tahoe2PeerSelector:
             self.query_count += 1
             self.num_peers_contacted += 1
             d = peer.query(shares_to_ask)
-            d.addBoth(self._got_response, peer, shares_to_ask)
+            d.addBoth(self._got_response, peer, shares_to_ask,
+                      self.contacted_peers)
             return d
-        elif len(self.contacted_peers) > 1:
+        elif self.contacted_peers:
             # ask a peer that we've already asked.
+            num_shares = mathutil.div_ceil(len(self.homeless_shares),
+                                           len(self.contacted_peers))
             peer = self.contacted_peers.pop(0)
-            if peer == "start":
-                # we're at the beginning of the list, so re-calculate
-                # shares_per_peer
-                num_shares = mathutil.div_ceil(len(self.homeless_shares),
-                                               len(self.contacted_peers))
-                self.shares_per_peer = num_shares
-                self.contacted_peers.append("start")
-                peer = self.contacted_peers.pop(0)
-            shares_to_ask = set(self.homeless_shares[:self.shares_per_peer])
-            self.homeless_shares[:self.shares_per_peer] = []
+            shares_to_ask = set(self.homeless_shares[:num_shares])
+            self.homeless_shares[:num_shares] = []
             self.query_count += 1
             d = peer.query(shares_to_ask)
-            d.addBoth(self._got_response, peer, shares_to_ask)
+            d.addBoth(self._got_response, peer, shares_to_ask,
+                      self.contacted_peers2)
             return d
+        elif self.contacted_peers2:
+            # we've finished the second-or-later pass. Move all the remaining
+            # peers back into self.contacted_peers for the next pass.
+            self.contacted_peers.extend(self.contacted_peers2)
+            self.contacted_peers[:] = []
+            return self._loop()
         else:
             # no more peers. If we haven't placed enough shares, we fail.
             placed_shares = self.total_shares - len(self.homeless_shares)
@@ -230,14 +233,16 @@ class Tahoe2PeerSelector:
                 # we placed enough to be happy, so we're done
                 return self.use_peers
 
-    def _got_response(self, res, peer, shares_to_ask):
+    def _got_response(self, res, peer, shares_to_ask, put_peer_here):
         if isinstance(res, failure.Failure):
             # This is unusual, and probably indicates a bug or a network
             # problem.
             log.msg("%s got error during peer selection: %s" % (peer, res))
             self.error_count += 1
             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
-            if self.uncontacted_peers or len(self.contacted_peers) > 1:
+            if (self.uncontacted_peers
+                or self.contacted_peers
+                or self.contacted_peers2):
                 # there is still hope, so just loop
                 pass
             else:
@@ -290,7 +295,7 @@ class Tahoe2PeerSelector:
             else:
                 # if they *were* able to accept everything, they might be
                 # willing to accept even more.
-                self.contacted_peers.append(peer)
+                put_peer_here.append(peer)
 
         # now loop
         return self._loop()
-- 
2.45.2