assert not self._codec
k, happy, n, segsize = params
self.required_shares = k
- self.shares_of_happiness = happy
+ self.servers_of_happiness = happy
self.num_shares = n
self.segment_size = segsize
self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
if name == "storage_index":
return self._storage_index
elif name == "share_counts":
- return (self.required_shares, self.shares_of_happiness,
+ return (self.required_shares, self.servers_of_happiness,
self.num_shares)
elif name == "num_segments":
return self.num_segments
else:
raise KeyError("unknown parameter name '%s'" % name)
- def set_shareholders(self, landlords):
+ def set_shareholders(self, landlords, servermap):
assert isinstance(landlords, dict)
for k in landlords:
assert IStorageBucketWriter.providedBy(landlords[k])
self.landlords = landlords.copy()
+ assert isinstance(servermap, dict)
+ self.servermap = servermap.copy()
def start(self):
""" Returns a Deferred that will fire with the verify cap (an instance of
# even more UNUSUAL
self.log("they weren't in our list of landlords", parent=ln,
level=log.WEIRD, umid="TQGFRw")
- if len(self.landlords) < self.shares_of_happiness:
- msg = "lost too many shareholders during upload (still have %d, want %d): %s" % \
- (len(self.landlords), self.shares_of_happiness, why)
- if self.landlords:
+ del(self.servermap[shareid])
+ servers_left = list(set(self.servermap.values()))
+ if len(servers_left) < self.servers_of_happiness:
+ msg = "lost too many servers during upload (still have %d, want %d): %s" % \
+ (len(servers_left),
+ self.servers_of_happiness, why)
+ if servers_left:
raise NotEnoughSharesError(msg)
else:
raise NoSharesError(msg)
self.log("but we can still continue with %s shares, we'll be happy "
- "with at least %s" % (len(self.landlords),
- self.shares_of_happiness),
+ "with at least %s" % (len(servers_left),
+ self.servers_of_happiness),
parent=ln)
def _gather_responses(self, dl):
self.buckets.update(b)
return (alreadygot, set(b.keys()))
+def servers_with_unique_shares(existing_shares, used_peers=None):
+ servers = []
+ if used_peers:
+ peers = list(used_peers.copy())
+ # We do this because the preexisting shares list goes by peerid.
+ peers = [x.peerid for x in peers]
+ servers.extend(peers)
+ servers.extend(existing_shares.values())
+ return list(set(servers))
+
+def shares_by_server(existing_shares):
+ servers = {}
+ for server in set(existing_shares.values()):
+ servers[server] = set([x for x in existing_shares.keys()
+ if existing_shares[x] == server])
+ return servers
+
class Tahoe2PeerSelector:
def __init__(self, upload_id, logparent=None, upload_status=None):
def get_shareholders(self, storage_broker, secret_holder,
storage_index, share_size, block_size,
- num_segments, total_shares, shares_of_happiness):
+ num_segments, total_shares, servers_of_happiness):
"""
@return: (used_peers, already_peers), where used_peers is a set of
PeerTracker instances that have agreed to hold some shares
self._status.set_status("Contacting Peers..")
self.total_shares = total_shares
- self.shares_of_happiness = shares_of_happiness
+ self.servers_of_happiness = servers_of_happiness
self.homeless_shares = range(total_shares)
# self.uncontacted_peers = list() # peers we haven't asked yet
d = defer.maybeDeferred(self._loop)
return d
+
def _loop(self):
if not self.homeless_shares:
- # all done
- msg = ("placed all %d shares, "
- "sent %d queries to %d peers, "
- "%d queries placed some shares, %d placed none, "
- "got %d errors" %
- (self.total_shares,
- self.query_count, self.num_peers_contacted,
- self.good_query_count, self.bad_query_count,
- self.error_count))
- log.msg("peer selection successful for %s: %s" % (self, msg),
+ effective_happiness = servers_with_unique_shares(
+ self.preexisting_shares,
+ self.use_peers)
+ if self.servers_of_happiness <= len(effective_happiness):
+ msg = ("placed all %d shares, "
+ "sent %d queries to %d peers, "
+ "%d queries placed some shares, %d placed none, "
+ "got %d errors" %
+ (self.total_shares,
+ self.query_count, self.num_peers_contacted,
+ self.good_query_count, self.bad_query_count,
+ self.error_count))
+ log.msg("peer selection successful for %s: %s" % (self, msg),
parent=self._log_parent)
- return (self.use_peers, self.preexisting_shares)
+ return (self.use_peers, self.preexisting_shares)
+ else:
+ delta = self.servers_of_happiness - len(effective_happiness)
+ shares = shares_by_server(self.preexisting_shares)
+ # Each server in shares maps to a set of shares stored on it.
+ # Since we want to keep at least one share on each server
+ # that has one (otherwise we'd only be making
+ # the situation worse by removing distinct servers),
+ # each server has len(its shares) - 1 to spread around.
+ shares_to_spread = sum([len(list(sharelist)) - 1
+ for (server, sharelist)
+ in shares.items()])
+ if delta <= len(self.uncontacted_peers) and \
+ shares_to_spread >= delta:
+ # Loop through the allocated shares, removing
+ items = shares.items()
+ while len(self.homeless_shares) < delta:
+ servernum, sharelist = items.pop()
+ if len(sharelist) > 1:
+ share = sharelist.pop()
+ self.homeless_shares.append(share)
+ del(self.preexisting_shares[share])
+ items.append((servernum, sharelist))
+ return self._loop()
+ else:
+ raise NotEnoughSharesError("shares could only be placed on %d "
+ "servers (%d were requested)" %
+ (len(effective_happiness),
+ self.servers_of_happiness))
if self.uncontacted_peers:
peer = self.uncontacted_peers.pop(0)
else:
# no more peers. If we haven't placed enough shares, we fail.
placed_shares = self.total_shares - len(self.homeless_shares)
- if placed_shares < self.shares_of_happiness:
+ effective_happiness = servers_with_unique_shares(
+ self.preexisting_shares,
+ self.use_peers)
+ if len(effective_happiness) < self.servers_of_happiness:
msg = ("placed %d shares out of %d total (%d homeless), "
- "want to place %d, "
+ "want to place on %d servers, "
"sent %d queries to %d peers, "
"%d queries placed some shares, %d placed none, "
"got %d errors" %
(self.total_shares - len(self.homeless_shares),
self.total_shares, len(self.homeless_shares),
- self.shares_of_happiness,
+ self.servers_of_happiness,
self.query_count, self.num_peers_contacted,
self.good_query_count, self.bad_query_count,
self.error_count))
level=log.NOISY, parent=self._log_parent)
progress = False
for s in alreadygot:
+ if self.preexisting_shares.has_key(s):
+ old_size = len(servers_with_unique_shares(self.preexisting_shares))
+ new_candidate = self.preexisting_shares.copy()
+ new_candidate[s] = peer.peerid
+ new_size = len(servers_with_unique_shares(new_candidate))
+ if old_size >= new_size: continue
self.preexisting_shares[s] = peer.peerid
if s in self.homeless_shares:
self.homeless_shares.remove(s)
for peer in used_peers:
assert isinstance(peer, PeerTracker)
buckets = {}
+ servermap = already_peers.copy()
for peer in used_peers:
buckets.update(peer.buckets)
for shnum in peer.buckets:
self._peer_trackers[shnum] = peer
+ servermap[shnum] = peer.peerid
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
- encoder.set_shareholders(buckets)
+ encoder.set_shareholders(buckets, servermap)
def _encrypted_done(self, verifycap):
""" Returns a Deferred that will fire with the UploadResults instance. """