self.closed = True
class FakeClient:
- def __init__(self, mode="good"):
+ def __init__(self, mode="good", num_servers=50):
self.mode = mode
+ self.num_servers = num_servers
def get_permuted_peers(self, storage_index, include_myself):
peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
- for fakeid in range(50) ]
+ for fakeid in range(self.num_servers) ]
self.last_peers = [p[2] for p in peers]
return peers
def get_push_to_ourselves(self):
return d
class PeerSelection(unittest.TestCase):
- def setUp(self):
- self.node = FakeClient(mode="good")
+
+ def make_client(self, num_servers=50):
+ self.node = FakeClient(mode="good", num_servers=num_servers)
self.u = upload.Uploader()
self.u.running = True
self.u.parent = self.node
# if we have 50 shares, and there are 50 peers, and they all accept a
# share, we should get exactly one share per peer
+ self.make_client()
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (25, 30, 50)
d = self.u.upload_data(data)
# if we have 100 shares, and there are 50 peers, and they all accept
# all shares, we should get exactly two shares per peer
+ self.make_client()
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (50, 75, 100)
d = self.u.upload_data(data)
# if we have 51 shares, and there are 50 peers, then one peer gets
# two shares and the rest get just one
+ self.make_client()
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (24, 41, 51)
d = self.u.upload_data(data)
# 4 shares. The design goal is to accomplish this with only two
# queries per peer.
+ self.make_client()
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200)
d = self.u.upload_data(data)
d.addCallback(_check)
return d
+ def test_three_of_ten(self):
+ # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
+ # 4+4+2
+
+ self.make_client(3)
+ data = self.get_data(SIZE_LARGE)
+ self.u.DEFAULT_ENCODING_PARAMETERS = (3, 5, 10)
+ d = self.u.upload_data(data)
+ d.addCallback(self._check_large, SIZE_LARGE)
+ def _check(res):
+ counts = {}
+ for p in self.node.last_peers:
+ allocated = p.ss.allocated
+ counts[len(allocated)] = counts.get(len(allocated), 0) + 1
+ histogram = [counts.get(i, 0) for i in range(5)]
+ self.failUnlessEqual(histogram, [0,0,0,2,1])
+ d.addCallback(_check)
+ return d
+
# TODO:
# upload with exactly 75 peers (shares_of_happiness)
self.homeless_shares = range(total_shares)
# self.uncontacted_peers = list() # peers we haven't asked yet
- self.contacted_peers = ["start"] # peers worth asking again
+ self.contacted_peers = [] # peers worth asking again
+ self.contacted_peers2 = [] # peers that we have asked again
self.use_peers = set() # PeerTrackers that have shares assigned to them
self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
self.query_count += 1
self.num_peers_contacted += 1
d = peer.query(shares_to_ask)
- d.addBoth(self._got_response, peer, shares_to_ask)
+ d.addBoth(self._got_response, peer, shares_to_ask,
+ self.contacted_peers)
return d
- elif len(self.contacted_peers) > 1:
+ elif self.contacted_peers:
# ask a peer that we've already asked.
+ num_shares = mathutil.div_ceil(len(self.homeless_shares),
+ len(self.contacted_peers))
peer = self.contacted_peers.pop(0)
- if peer == "start":
- # we're at the beginning of the list, so re-calculate
- # shares_per_peer
- num_shares = mathutil.div_ceil(len(self.homeless_shares),
- len(self.contacted_peers))
- self.shares_per_peer = num_shares
- self.contacted_peers.append("start")
- peer = self.contacted_peers.pop(0)
- shares_to_ask = set(self.homeless_shares[:self.shares_per_peer])
- self.homeless_shares[:self.shares_per_peer] = []
+ shares_to_ask = set(self.homeless_shares[:num_shares])
+ self.homeless_shares[:num_shares] = []
self.query_count += 1
d = peer.query(shares_to_ask)
- d.addBoth(self._got_response, peer, shares_to_ask)
+ d.addBoth(self._got_response, peer, shares_to_ask,
+ self.contacted_peers2)
return d
+ elif self.contacted_peers2:
+ # we've finished the second-or-later pass. Move all the remaining
+ # peers back into self.contacted_peers for the next pass.
+ self.contacted_peers.extend(self.contacted_peers2)
+ self.contacted_peers[:] = []
+ return self._loop()
else:
# no more peers. If we haven't placed enough shares, we fail.
placed_shares = self.total_shares - len(self.homeless_shares)
# we placed enough to be happy, so we're done
return self.use_peers
- def _got_response(self, res, peer, shares_to_ask):
+ def _got_response(self, res, peer, shares_to_ask, put_peer_here):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
log.msg("%s got error during peer selection: %s" % (peer, res))
self.error_count += 1
self.homeless_shares = list(shares_to_ask) + self.homeless_shares
- if self.uncontacted_peers or len(self.contacted_peers) > 1:
+ if (self.uncontacted_peers
+ or self.contacted_peers
+ or self.contacted_peers2):
# there is still hope, so just loop
pass
else:
else:
# if they *were* able to accept everything, they might be
# willing to accept even more.
- self.contacted_peers.append(peer)
+ put_peer_here.append(peer)
# now loop
return self._loop()