From 808f85158989ebc7cbcfdb6fd2716c9d9239ddf2 Mon Sep 17 00:00:00 2001 From: Brian Warner <warner@lothar.com> Date: Sun, 16 Sep 2007 17:08:34 -0700 Subject: [PATCH] upload: make peer-selection a bit more uniform. Closes #132. --- src/allmydata/test/test_upload.py | 33 ++++++++++++++++++++++---- src/allmydata/upload.py | 39 +++++++++++++++++-------------- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index a1c32284..eaf2f52b 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -132,11 +132,12 @@ class FakeBucketWriter: self.closed = True class FakeClient: - def __init__(self, mode="good"): + def __init__(self, mode="good", num_servers=50): self.mode = mode + self.num_servers = num_servers def get_permuted_peers(self, storage_index, include_myself): peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),) - for fakeid in range(50) ] + for fakeid in range(self.num_servers) ] self.last_peers = [p[2] for p in peers] return peers def get_push_to_ourselves(self): @@ -276,8 +277,9 @@ class FullServer(unittest.TestCase): return d class PeerSelection(unittest.TestCase): - def setUp(self): - self.node = FakeClient(mode="good") + + def make_client(self, num_servers=50): + self.node = FakeClient(mode="good", num_servers=num_servers) self.u = upload.Uploader() self.u.running = True self.u.parent = self.node @@ -298,6 +300,7 @@ class PeerSelection(unittest.TestCase): # if we have 50 shares, and there are 50 peers, and they all accept a # share, we should get exactly one share per peer + self.make_client() data = self.get_data(SIZE_LARGE) self.u.DEFAULT_ENCODING_PARAMETERS = (25, 30, 50) d = self.u.upload_data(data) @@ -314,6 +317,7 @@ class PeerSelection(unittest.TestCase): # if we have 100 shares, and there are 50 peers, and they all accept # all shares, we should get exactly two shares per peer + self.make_client() data = self.get_data(SIZE_LARGE) self.u.DEFAULT_ENCODING_PARAMETERS = (50, 75, 100) d = self.u.upload_data(data) @@ -330,6 +334,7 @@ class PeerSelection(unittest.TestCase): # if we have 51 shares, and there are 50 peers, then one peer gets # two shares and the rest get just one + self.make_client() data = self.get_data(SIZE_LARGE) self.u.DEFAULT_ENCODING_PARAMETERS = (24, 41, 51) d = self.u.upload_data(data) @@ -356,6 +361,7 @@ class PeerSelection(unittest.TestCase): # 4 shares. The design goal is to accomplish this with only two # queries per peer. + self.make_client() data = self.get_data(SIZE_LARGE) self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200) d = self.u.upload_data(data) @@ -368,6 +374,25 @@ class PeerSelection(unittest.TestCase): d.addCallback(_check) return d + def test_three_of_ten(self): + # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than + # 4+4+2 + + self.make_client(3) + data = self.get_data(SIZE_LARGE) + self.u.DEFAULT_ENCODING_PARAMETERS = (3, 5, 10) + d = self.u.upload_data(data) + d.addCallback(self._check_large, SIZE_LARGE) + def _check(res): + counts = {} + for p in self.node.last_peers: + allocated = p.ss.allocated + counts[len(allocated)] = counts.get(len(allocated), 0) + 1 + histogram = [counts.get(i, 0) for i in range(5)] + self.failUnlessEqual(histogram, [0,0,0,2,1]) + d.addCallback(_check) + return d + # TODO: # upload with exactly 75 peers (shares_of_happiness) diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index f01ce7dc..db95901f 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -124,7 +124,8 @@ class Tahoe2PeerSelector: self.homeless_shares = range(total_shares) # self.uncontacted_peers = list() # peers we haven't asked yet - self.contacted_peers = ["start"] # peers worth asking again + self.contacted_peers = [] # peers worth asking again + self.contacted_peers2 = [] # peers that we have asked again self.use_peers = set() # PeerTrackers that have shares assigned to them self.preexisting_shares = {} # sharenum -> PeerTracker holding the share @@ -189,25 +190,27 @@ class Tahoe2PeerSelector: self.query_count += 1 self.num_peers_contacted += 1 d = peer.query(shares_to_ask) - d.addBoth(self._got_response, peer, shares_to_ask) + d.addBoth(self._got_response, peer, shares_to_ask, + self.contacted_peers) return d - elif len(self.contacted_peers) > 1: + elif self.contacted_peers: # ask a peer that we've already asked. + num_shares = mathutil.div_ceil(len(self.homeless_shares), + len(self.contacted_peers)) peer = self.contacted_peers.pop(0) - if peer == "start": - # we're at the beginning of the list, so re-calculate - # shares_per_peer - num_shares = mathutil.div_ceil(len(self.homeless_shares), - len(self.contacted_peers)) - self.shares_per_peer = num_shares - self.contacted_peers.append("start") - peer = self.contacted_peers.pop(0) - shares_to_ask = set(self.homeless_shares[:self.shares_per_peer]) - self.homeless_shares[:self.shares_per_peer] = [] + shares_to_ask = set(self.homeless_shares[:num_shares]) + self.homeless_shares[:num_shares] = [] self.query_count += 1 d = peer.query(shares_to_ask) - d.addBoth(self._got_response, peer, shares_to_ask) + d.addBoth(self._got_response, peer, shares_to_ask, + self.contacted_peers2) return d + elif self.contacted_peers2: + # we've finished the second-or-later pass. Move all the remaining + # peers back into self.contacted_peers for the next pass. + self.contacted_peers.extend(self.contacted_peers2) + self.contacted_peers[:] = [] + return self._loop() else: # no more peers. If we haven't placed enough shares, we fail. placed_shares = self.total_shares - len(self.homeless_shares) @@ -230,14 +233,16 @@ class Tahoe2PeerSelector: # we placed enough to be happy, so we're done return self.use_peers - def _got_response(self, res, peer, shares_to_ask): + def _got_response(self, res, peer, shares_to_ask, put_peer_here): if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. log.msg("%s got error during peer selection: %s" % (peer, res)) self.error_count += 1 self.homeless_shares = list(shares_to_ask) + self.homeless_shares - if self.uncontacted_peers or len(self.contacted_peers) > 1: + if (self.uncontacted_peers + or self.contacted_peers + or self.contacted_peers2): # there is still hope, so just loop pass else: @@ -290,7 +295,7 @@ class Tahoe2PeerSelector: else: # if they *were* able to accept everything, they might be # willing to accept even more. - self.contacted_peers.append(peer) + put_peer_here.append(peer) # now loop return self._loop() -- 2.45.2