return (alreadygot, set(b.keys()))
def servers_with_unique_shares(existing_shares, used_peers=None):
+ """
+ I accept a dict of shareid -> peerid mappings (and optionally a list
+ of PeerTracker instances) and return a list of servers that have shares.
+ """
servers = []
+ existing_shares = existing_shares.copy()
if used_peers:
+ peerdict = {}
+ for peer in used_peers:
+ peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
+ for k in peerdict.keys():
+ if existing_shares.has_key(k):
+ # Prevent overcounting; favor the bucket, and not the
+ # prexisting share.
+ del(existing_shares[k])
peers = list(used_peers.copy())
# We do this because the preexisting shares list goes by peerid.
peers = [x.peerid for x in peers]
return list(set(servers))
def shares_by_server(existing_shares):
+ """
+ I accept a dict of shareid -> peerid mappings, and return a dict
+ of peerid -> shareid mappings
+ """
servers = {}
for server in set(existing_shares.values()):
servers[server] = set([x for x in existing_shares.keys()
if existing_shares[x] == server])
return servers
+def should_add_server(existing_shares, server, bucket):
+ """
+ I tell my caller whether the servers_of_happiness number will be
+ increased or decreased if a particular server is added as the peer
+ already holding a particular share. I take a dictionary, a peerid,
+ and a bucket as arguments, and return a boolean.
+ """
+ old_size = len(servers_with_unique_shares(existing_shares))
+ new_candidate = existing_shares.copy()
+ new_candidate[bucket] = server
+ new_size = len(servers_with_unique_shares(new_candidate))
+ return old_size < new_size
+
class Tahoe2PeerSelector:
def __init__(self, upload_id, logparent=None, upload_status=None):
peer = self.readonly_peers.pop()
assert isinstance(peer, PeerTracker)
d = peer.query_allocated()
- d.addCallback(self._handle_allocate_response)
+ d.addCallback(self._handle_existing_response)
return d
- def _handle_allocate_response(self, (peer, buckets)):
+ def _handle_existing_response(self, (peer, buckets)):
for bucket in buckets:
- self.preexisting_shares[bucket] = peer
- if self.homeless_shares:
- self.homeless_shares.remove(bucket)
+ if should_add_server(self.preexisting_shares, peer, bucket):
+ self.preexisting_shares[bucket] = peer
+ if self.homeless_shares and bucket in self.homeless_shares:
+ self.homeless_shares.remove(bucket)
return self._existing_shares()
def _loop(self):
items.append((servernum, sharelist))
return self._loop()
else:
- raise NotEnoughSharesError("shares could only be placed on %d "
- "servers (%d were requested)" %
- (len(effective_happiness),
- self.servers_of_happiness))
+ raise NotEnoughSharesError("shares could only be placed "
+ "on %d servers (%d were requested)" %
+ (len(effective_happiness),
+ self.servers_of_happiness))
if self.uncontacted_peers:
peer = self.uncontacted_peers.pop(0)
# we placed enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
- return self.use_peers
+ return (self.use_peers, self.preexisting_shares)
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
if isinstance(res, failure.Failure):
level=log.NOISY, parent=self._log_parent)
progress = False
for s in alreadygot:
- if self.preexisting_shares.has_key(s):
- old_size = len(servers_with_unique_shares(self.preexisting_shares))
- new_candidate = self.preexisting_shares.copy()
- new_candidate[s] = peer.peerid
- new_size = len(servers_with_unique_shares(new_candidate))
- if old_size >= new_size: continue
- self.preexisting_shares[s] = peer.peerid
- if s in self.homeless_shares:
- self.homeless_shares.remove(s)
- progress = True
+ if should_add_server(self.preexisting_shares,
+ peer.peerid, s):
+ self.preexisting_shares[s] = peer.peerid
+ if s in self.homeless_shares:
+ self.homeless_shares.remove(s)
+ progress = True
# the PeerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.