From 8c71df53f9e4632ad9f0c972be33f46165f8b507 Mon Sep 17 00:00:00 2001 From: Kevan Carstensen Date: Wed, 4 Nov 2009 05:12:22 -0700 Subject: [PATCH] Alter 'immutable/encode.py' and 'immutable/upload.py' to use servers_of_happiness instead of shares_of_happiness. --- src/allmydata/immutable/encode.py | 23 +++++--- src/allmydata/immutable/upload.py | 94 +++++++++++++++++++++++++------ 2 files changed, 91 insertions(+), 26 deletions(-) diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py index bb0b79ee..c3ff0d21 100644 --- a/src/allmydata/immutable/encode.py +++ b/src/allmydata/immutable/encode.py @@ -118,7 +118,7 @@ class Encoder(object): assert not self._codec k, happy, n, segsize = params self.required_shares = k - self.shares_of_happiness = happy + self.servers_of_happiness = happy self.num_shares = n self.segment_size = segsize self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize)) @@ -176,7 +176,7 @@ class Encoder(object): if name == "storage_index": return self._storage_index elif name == "share_counts": - return (self.required_shares, self.shares_of_happiness, + return (self.required_shares, self.servers_of_happiness, self.num_shares) elif name == "num_segments": return self.num_segments @@ -191,11 +191,13 @@ class Encoder(object): else: raise KeyError("unknown parameter name '%s'" % name) - def set_shareholders(self, landlords): + def set_shareholders(self, landlords, servermap): assert isinstance(landlords, dict) for k in landlords: assert IStorageBucketWriter.providedBy(landlords[k]) self.landlords = landlords.copy() + assert isinstance(servermap, dict) + self.servermap = servermap.copy() def start(self): """ Returns a Deferred that will fire with the verify cap (an instance of @@ -486,16 +488,19 @@ class Encoder(object): # even more UNUSUAL self.log("they weren't in our list of landlords", parent=ln, level=log.WEIRD, umid="TQGFRw") - if len(self.landlords) < self.shares_of_happiness: - msg = "lost too many shareholders during upload (still have %d, want %d): %s" % \ - (len(self.landlords), self.shares_of_happiness, why) - if self.landlords: + del(self.servermap[shareid]) + servers_left = list(set(self.servermap.values())) + if len(servers_left) < self.servers_of_happiness: + msg = "lost too many servers during upload (still have %d, want %d): %s" % \ + (len(servers_left), + self.servers_of_happiness, why) + if servers_left: raise NotEnoughSharesError(msg) else: raise NoSharesError(msg) self.log("but we can still continue with %s shares, we'll be happy " - "with at least %s" % (len(self.landlords), - self.shares_of_happiness), + "with at least %s" % (len(servers_left), + self.servers_of_happiness), parent=ln) def _gather_responses(self, dl): diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 72826994..00efceaa 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -128,6 +128,23 @@ class PeerTracker: self.buckets.update(b) return (alreadygot, set(b.keys())) +def servers_with_unique_shares(existing_shares, used_peers=None): + servers = [] + if used_peers: + peers = list(used_peers.copy()) + # We do this because the preexisting shares list goes by peerid. + peers = [x.peerid for x in peers] + servers.extend(peers) + servers.extend(existing_shares.values()) + return list(set(servers)) + +def shares_by_server(existing_shares): + servers = {} + for server in set(existing_shares.values()): + servers[server] = set([x for x in existing_shares.keys() + if existing_shares[x] == server]) + return servers + class Tahoe2PeerSelector: def __init__(self, upload_id, logparent=None, upload_status=None): @@ -144,7 +161,7 @@ class Tahoe2PeerSelector: def get_shareholders(self, storage_broker, secret_holder, storage_index, share_size, block_size, - num_segments, total_shares, shares_of_happiness): + num_segments, total_shares, servers_of_happiness): """ @return: (used_peers, already_peers), where used_peers is a set of PeerTracker instances that have agreed to hold some shares @@ -157,7 +174,7 @@ class Tahoe2PeerSelector: self._status.set_status("Contacting Peers..") self.total_shares = total_shares - self.shares_of_happiness = shares_of_happiness + self.servers_of_happiness = servers_of_happiness self.homeless_shares = range(total_shares) # self.uncontacted_peers = list() # peers we haven't asked yet @@ -222,20 +239,52 @@ class Tahoe2PeerSelector: d = defer.maybeDeferred(self._loop) return d + def _loop(self): if not self.homeless_shares: - # all done - msg = ("placed all %d shares, " - "sent %d queries to %d peers, " - "%d queries placed some shares, %d placed none, " - "got %d errors" % - (self.total_shares, - self.query_count, self.num_peers_contacted, - self.good_query_count, self.bad_query_count, - self.error_count)) - log.msg("peer selection successful for %s: %s" % (self, msg), + effective_happiness = servers_with_unique_shares( + self.preexisting_shares, + self.use_peers) + if self.servers_of_happiness <= len(effective_happiness): + msg = ("placed all %d shares, " + "sent %d queries to %d peers, " + "%d queries placed some shares, %d placed none, " + "got %d errors" % + (self.total_shares, + self.query_count, self.num_peers_contacted, + self.good_query_count, self.bad_query_count, + self.error_count)) + log.msg("peer selection successful for %s: %s" % (self, msg), parent=self._log_parent) - return (self.use_peers, self.preexisting_shares) + return (self.use_peers, self.preexisting_shares) + else: + delta = self.servers_of_happiness - len(effective_happiness) + shares = shares_by_server(self.preexisting_shares) + # Each server in shares maps to a set of shares stored on it. + # Since we want to keep at least one share on each server + # that has one (otherwise we'd only be making + # the situation worse by removing distinct servers), + # each server has len(its shares) - 1 to spread around. + shares_to_spread = sum([len(list(sharelist)) - 1 + for (server, sharelist) + in shares.items()]) + if delta <= len(self.uncontacted_peers) and \ + shares_to_spread >= delta: + # Loop through the allocated shares, removing + items = shares.items() + while len(self.homeless_shares) < delta: + servernum, sharelist = items.pop() + if len(sharelist) > 1: + share = sharelist.pop() + self.homeless_shares.append(share) + del(self.preexisting_shares[share]) + items.append((servernum, sharelist)) + return self._loop() + else: + raise NotEnoughSharesError("shares could only be placed on %d " + "servers (%d were requested)" % + (len(effective_happiness), + self.servers_of_happiness)) if self.uncontacted_peers: peer = self.uncontacted_peers.pop(0) @@ -284,15 +333,18 @@ class Tahoe2PeerSelector: else: # no more peers. If we haven't placed enough shares, we fail. placed_shares = self.total_shares - len(self.homeless_shares) - if placed_shares < self.shares_of_happiness: + effective_happiness = servers_with_unique_shares( + self.preexisting_shares, + self.use_peers) + if len(effective_happiness) < self.servers_of_happiness: msg = ("placed %d shares out of %d total (%d homeless), " - "want to place %d, " + "want to place on %d servers, " "sent %d queries to %d peers, " "%d queries placed some shares, %d placed none, " "got %d errors" % (self.total_shares - len(self.homeless_shares), self.total_shares, len(self.homeless_shares), - self.shares_of_happiness, + self.servers_of_happiness, self.query_count, self.num_peers_contacted, self.good_query_count, self.bad_query_count, self.error_count)) @@ -339,6 +391,12 @@ class Tahoe2PeerSelector: level=log.NOISY, parent=self._log_parent) progress = False for s in alreadygot: + if self.preexisting_shares.has_key(s): + old_size = len(servers_with_unique_shares(self.preexisting_shares)) + new_candidate = self.preexisting_shares.copy() + new_candidate[s] = peer.peerid + new_size = len(servers_with_unique_shares(new_candidate)) + if old_size >= new_size: continue self.preexisting_shares[s] = peer.peerid if s in self.homeless_shares: self.homeless_shares.remove(s) @@ -764,12 +822,14 @@ class CHKUploader: for peer in used_peers: assert isinstance(peer, PeerTracker) buckets = {} + servermap = already_peers.copy() for peer in used_peers: buckets.update(peer.buckets) for shnum in peer.buckets: self._peer_trackers[shnum] = peer + servermap[shnum] = peer.peerid assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]) - encoder.set_shareholders(buckets) + encoder.set_shareholders(buckets, servermap) def _encrypted_done(self, verifycap): """ Returns a Deferred that will fire with the UploadResults instance. """ -- 2.45.2