From 320582be5a553b37554bf58c84373314989a0d42 Mon Sep 17 00:00:00 2001 From: Kevan Carstensen Date: Tue, 17 Nov 2009 19:45:42 -0700 Subject: [PATCH] Eliminate overcounting iof servers_of_happiness in Tahoe2PeerSelector; also reorganize some things. --- src/allmydata/immutable/upload.py | 67 ++++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 3477ed5f..1e8289ec 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -138,8 +138,21 @@ class PeerTracker: return (alreadygot, set(b.keys())) def servers_with_unique_shares(existing_shares, used_peers=None): + """ + I accept a dict of shareid -> peerid mappings (and optionally a list + of PeerTracker instances) and return a list of servers that have shares. + """ servers = [] + existing_shares = existing_shares.copy() if used_peers: + peerdict = {} + for peer in used_peers: + peerdict.update(dict([(i, peer.peerid) for i in peer.buckets])) + for k in peerdict.keys(): + if existing_shares.has_key(k): + # Prevent overcounting; favor the bucket, and not the + # prexisting share. + del(existing_shares[k]) peers = list(used_peers.copy()) # We do this because the preexisting shares list goes by peerid. peers = [x.peerid for x in peers] @@ -148,12 +161,29 @@ def servers_with_unique_shares(existing_shares, used_peers=None): return list(set(servers)) def shares_by_server(existing_shares): + """ + I accept a dict of shareid -> peerid mappings, and return a dict + of peerid -> shareid mappings + """ servers = {} for server in set(existing_shares.values()): servers[server] = set([x for x in existing_shares.keys() if existing_shares[x] == server]) return servers +def should_add_server(existing_shares, server, bucket): + """ + I tell my caller whether the servers_of_happiness number will be + increased or decreased if a particular server is added as the peer + already holding a particular share. I take a dictionary, a peerid, + and a bucket as arguments, and return a boolean. + """ + old_size = len(servers_with_unique_shares(existing_shares)) + new_candidate = existing_shares.copy() + new_candidate[bucket] = server + new_size = len(servers_with_unique_shares(new_candidate)) + return old_size < new_size + class Tahoe2PeerSelector: def __init__(self, upload_id, logparent=None, upload_status=None): @@ -261,14 +291,15 @@ class Tahoe2PeerSelector: peer = self.readonly_peers.pop() assert isinstance(peer, PeerTracker) d = peer.query_allocated() - d.addCallback(self._handle_allocate_response) + d.addCallback(self._handle_existing_response) return d - def _handle_allocate_response(self, (peer, buckets)): + def _handle_existing_response(self, (peer, buckets)): for bucket in buckets: - self.preexisting_shares[bucket] = peer - if self.homeless_shares: - self.homeless_shares.remove(bucket) + if should_add_server(self.preexisting_shares, peer, bucket): + self.preexisting_shares[bucket] = peer + if self.homeless_shares and bucket in self.homeless_shares: + self.homeless_shares.remove(bucket) return self._existing_shares() def _loop(self): @@ -312,10 +343,10 @@ class Tahoe2PeerSelector: items.append((servernum, sharelist)) return self._loop() else: - raise NotEnoughSharesError("shares could only be placed on %d " - "servers (%d were requested)" % - (len(effective_happiness), - self.servers_of_happiness)) + raise NotEnoughSharesError("shares could only be placed " + "on %d servers (%d were requested)" % + (len(effective_happiness), + self.servers_of_happiness)) if self.uncontacted_peers: peer = self.uncontacted_peers.pop(0) @@ -391,7 +422,7 @@ class Tahoe2PeerSelector: # we placed enough to be happy, so we're done if self._status: self._status.set_status("Placed all shares") - return self.use_peers + return (self.use_peers, self.preexisting_shares) def _got_response(self, res, peer, shares_to_ask, put_peer_here): if isinstance(res, failure.Failure): @@ -422,16 +453,12 @@ class Tahoe2PeerSelector: level=log.NOISY, parent=self._log_parent) progress = False for s in alreadygot: - if self.preexisting_shares.has_key(s): - old_size = len(servers_with_unique_shares(self.preexisting_shares)) - new_candidate = self.preexisting_shares.copy() - new_candidate[s] = peer.peerid - new_size = len(servers_with_unique_shares(new_candidate)) - if old_size >= new_size: continue - self.preexisting_shares[s] = peer.peerid - if s in self.homeless_shares: - self.homeless_shares.remove(s) - progress = True + if should_add_server(self.preexisting_shares, + peer.peerid, s): + self.preexisting_shares[s] = peer.peerid + if s in self.homeless_shares: + self.homeless_shares.remove(s) + progress = True # the PeerTracker will remember which shares were allocated on # that peer. We just have to remember to use them. -- 2.45.2