From: Brian Warner Date: Sun, 16 Sep 2007 08:53:00 +0000 (-0700) Subject: peer-selection: if we must loop, send a minimal number of queries (by asking for... X-Git-Tag: allmydata-tahoe-0.6.0~90 X-Git-Url: https://git.rkrishnan.org/COPYING.TGPPL.html?a=commitdiff_plain;h=24e6ccddce2f3d3c4153946fbbc753c7e4884ba0;p=tahoe-lafs%2Ftahoe-lafs.git peer-selection: if we must loop, send a minimal number of queries (by asking for more than one share per peer on the second pass) --- diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 7ea7512d..a1c32284 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -78,6 +78,7 @@ class FakeStorageServer: def __init__(self, mode): self.mode = mode self.allocated = [] + self.queries = 0 def callRemote(self, methname, *args, **kwargs): def _call(): meth = getattr(self, methname) @@ -89,6 +90,7 @@ class FakeStorageServer: def allocate_buckets(self, storage_index, renew_secret, cancel_secret, sharenums, share_size, canary): #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size) + self.queries += 1 if self.mode == "full": return (set(), {},) elif self.mode == "already got them": @@ -304,6 +306,7 @@ class PeerSelection(unittest.TestCase): for p in self.node.last_peers: allocated = p.ss.allocated self.failUnlessEqual(len(allocated), 1) + self.failUnlessEqual(p.ss.queries, 1) d.addCallback(_check) return d @@ -319,6 +322,7 @@ class PeerSelection(unittest.TestCase): for p in self.node.last_peers: allocated = p.ss.allocated self.failUnlessEqual(len(allocated), 2) + self.failUnlessEqual(p.ss.queries, 2) d.addCallback(_check) return d @@ -337,14 +341,33 @@ class PeerSelection(unittest.TestCase): allocated = p.ss.allocated self.failUnless(len(allocated) in (1,2), len(allocated)) if len(allocated) == 1: + self.failUnlessEqual(p.ss.queries, 1) got_one.append(p) else: + self.failUnlessEqual(p.ss.queries, 2) got_two.append(p) self.failUnlessEqual(len(got_one), 49) self.failUnlessEqual(len(got_two), 1) d.addCallback(_check) return d + def test_four_each(self): + # if we have 200 shares, and there are 50 peers, then each peer gets + # 4 shares. The design goal is to accomplish this with only two + # queries per peer. + + data = self.get_data(SIZE_LARGE) + self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200) + d = self.u.upload_data(data) + d.addCallback(self._check_large, SIZE_LARGE) + def _check(res): + for p in self.node.last_peers: + allocated = p.ss.allocated + self.failUnlessEqual(len(allocated), 4) + self.failUnlessEqual(p.ss.queries, 2) + d.addCallback(_check) + return d + # TODO: # upload with exactly 75 peers (shares_of_happiness) diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 6e0a0ec0..f01ce7dc 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -124,7 +124,7 @@ class Tahoe2PeerSelector: self.homeless_shares = range(total_shares) # self.uncontacted_peers = list() # peers we haven't asked yet - self.contacted_peers = list() # peers worth asking again + self.contacted_peers = ["start"] # peers worth asking again self.use_peers = set() # PeerTrackers that have shares assigned to them self.preexisting_shares = {} # sharenum -> PeerTracker holding the share @@ -191,13 +191,19 @@ class Tahoe2PeerSelector: d = peer.query(shares_to_ask) d.addBoth(self._got_response, peer, shares_to_ask) return d - elif self.contacted_peers: + elif len(self.contacted_peers) > 1: # ask a peer that we've already asked. - num_shares = mathutil.div_ceil(len(self.homeless_shares), - len(self.contacted_peers)) - shares_to_ask = set(self.homeless_shares[:num_shares]) - self.homeless_shares[:num_shares] = [] peer = self.contacted_peers.pop(0) + if peer == "start": + # we're at the beginning of the list, so re-calculate + # shares_per_peer + num_shares = mathutil.div_ceil(len(self.homeless_shares), + len(self.contacted_peers)) + self.shares_per_peer = num_shares + self.contacted_peers.append("start") + peer = self.contacted_peers.pop(0) + shares_to_ask = set(self.homeless_shares[:self.shares_per_peer]) + self.homeless_shares[:self.shares_per_peer] = [] self.query_count += 1 d = peer.query(shares_to_ask) d.addBoth(self._got_response, peer, shares_to_ask) @@ -231,7 +237,7 @@ class Tahoe2PeerSelector: log.msg("%s got error during peer selection: %s" % (peer, res)) self.error_count += 1 self.homeless_shares = list(shares_to_ask) + self.homeless_shares - if self.uncontacted_peers or self.contacted_peers: + if self.uncontacted_peers or len(self.contacted_peers) > 1: # there is still hope, so just loop pass else: