From 24e6ccddce2f3d3c4153946fbbc753c7e4884ba0 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Sun, 16 Sep 2007 01:53:00 -0700
Subject: [PATCH] peer-selection: if we must loop, send a minimal number of
 queries (by asking for more than one share per peer on the second pass)

---
 src/allmydata/test/test_upload.py | 23 +++++++++++++++++++++++
 src/allmydata/upload.py           | 20 +++++++++++++-------
 2 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py
index 7ea7512d..a1c32284 100644
--- a/src/allmydata/test/test_upload.py
+++ b/src/allmydata/test/test_upload.py
@@ -78,6 +78,7 @@ class FakeStorageServer:
     def __init__(self, mode):
         self.mode = mode
         self.allocated = []
+        self.queries = 0
     def callRemote(self, methname, *args, **kwargs):
         def _call():
             meth = getattr(self, methname)
@@ -89,6 +90,7 @@ class FakeStorageServer:
     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
                          sharenums, share_size, canary):
         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
+        self.queries += 1
         if self.mode == "full":
             return (set(), {},)
         elif self.mode == "already got them":
@@ -304,6 +306,7 @@ class PeerSelection(unittest.TestCase):
             for p in self.node.last_peers:
                 allocated = p.ss.allocated
                 self.failUnlessEqual(len(allocated), 1)
+                self.failUnlessEqual(p.ss.queries, 1)
         d.addCallback(_check)
         return d
 
@@ -319,6 +322,7 @@ class PeerSelection(unittest.TestCase):
             for p in self.node.last_peers:
                 allocated = p.ss.allocated
                 self.failUnlessEqual(len(allocated), 2)
+                self.failUnlessEqual(p.ss.queries, 2)
         d.addCallback(_check)
         return d
 
@@ -337,14 +341,33 @@ class PeerSelection(unittest.TestCase):
                 allocated = p.ss.allocated
                 self.failUnless(len(allocated) in (1,2), len(allocated))
                 if len(allocated) == 1:
+                    self.failUnlessEqual(p.ss.queries, 1)
                     got_one.append(p)
                 else:
+                    self.failUnlessEqual(p.ss.queries, 2)
                     got_two.append(p)
             self.failUnlessEqual(len(got_one), 49)
             self.failUnlessEqual(len(got_two), 1)
         d.addCallback(_check)
         return d
 
+    def test_four_each(self):
+        # if we have 200 shares, and there are 50 peers, then each peer gets
+        # 4 shares. The design goal is to accomplish this with only two
+        # queries per peer.
+
+        data = self.get_data(SIZE_LARGE)
+        self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200)
+        d = self.u.upload_data(data)
+        d.addCallback(self._check_large, SIZE_LARGE)
+        def _check(res):
+            for p in self.node.last_peers:
+                allocated = p.ss.allocated
+                self.failUnlessEqual(len(allocated), 4)
+                self.failUnlessEqual(p.ss.queries, 2)
+        d.addCallback(_check)
+        return d
+
 
 # TODO:
 #  upload with exactly 75 peers (shares_of_happiness)
diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py
index 6e0a0ec0..f01ce7dc 100644
--- a/src/allmydata/upload.py
+++ b/src/allmydata/upload.py
@@ -124,7 +124,7 @@ class Tahoe2PeerSelector:
 
         self.homeless_shares = range(total_shares)
         # self.uncontacted_peers = list() # peers we haven't asked yet
-        self.contacted_peers = list() # peers worth asking again
+        self.contacted_peers = ["start"] # peers worth asking again
         self.use_peers = set() # PeerTrackers that have shares assigned to them
         self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
 
@@ -191,13 +191,19 @@ class Tahoe2PeerSelector:
             d = peer.query(shares_to_ask)
             d.addBoth(self._got_response, peer, shares_to_ask)
             return d
-        elif self.contacted_peers:
+        elif len(self.contacted_peers) > 1:
             # ask a peer that we've already asked.
-            num_shares = mathutil.div_ceil(len(self.homeless_shares),
-                                           len(self.contacted_peers))
-            shares_to_ask = set(self.homeless_shares[:num_shares])
-            self.homeless_shares[:num_shares] = []
             peer = self.contacted_peers.pop(0)
+            if peer == "start":
+                # we're at the beginning of the list, so re-calculate
+                # shares_per_peer
+                num_shares = mathutil.div_ceil(len(self.homeless_shares),
+                                               len(self.contacted_peers))
+                self.shares_per_peer = num_shares
+                self.contacted_peers.append("start")
+                peer = self.contacted_peers.pop(0)
+            shares_to_ask = set(self.homeless_shares[:self.shares_per_peer])
+            self.homeless_shares[:self.shares_per_peer] = []
             self.query_count += 1
             d = peer.query(shares_to_ask)
             d.addBoth(self._got_response, peer, shares_to_ask)
@@ -231,7 +237,7 @@ class Tahoe2PeerSelector:
             log.msg("%s got error during peer selection: %s" % (peer, res))
             self.error_count += 1
             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
-            if self.uncontacted_peers or self.contacted_peers:
+            if self.uncontacted_peers or len(self.contacted_peers) > 1:
                 # there is still hope, so just loop
                 pass
             else:
-- 
2.45.2