From: Brian Warner Date: Sun, 27 Feb 2011 02:11:00 +0000 (-0700) Subject: refactor: s/peer/server/ in immutable/upload, happinessutil.py, test_upload X-Git-Url: https://git.rkrishnan.org/Site/Content/Exhibitors/class-simplejson.JSONDecoder.html?a=commitdiff_plain;h=ebfcb649f9c3d2400a75bf23acc653791ccafb0e;p=tahoe-lafs%2Ftahoe-lafs.git refactor: s/peer/server/ in immutable/upload, happinessutil.py, test_upload No behavioral changes, just updating variable/method names and log messages. The effects outside these three files should be minimal: some exception messages changed (to say "server" instead of "peer"), and some internal class names were changed. A few things still use "peer" to minimize external changes, like UploadResults.timings["peer_selection"] and happinessutil.merge_peers, which can be changed later. --- diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 66bf3c4e..2758520b 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -68,14 +68,14 @@ EXTENSION_SIZE = 1000 def pretty_print_shnum_to_servers(s): return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ]) -class PeerTracker: - def __init__(self, peerid, storage_server, +class ServerTracker: + def __init__(self, serverid, storage_server, sharesize, blocksize, num_segments, num_share_hashes, storage_index, bucket_renewal_secret, bucket_cancel_secret): - precondition(isinstance(peerid, str), peerid) - precondition(len(peerid) == 20, peerid) - self.peerid = peerid + precondition(isinstance(serverid, str), serverid) + precondition(len(serverid) == 20, serverid) + self.serverid = serverid self._storageserver = storage_server # to an RIStorageServer self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize @@ -83,7 +83,7 @@ class PeerTracker: wbp = layout.make_write_bucket_proxy(None, sharesize, blocksize, num_segments, num_share_hashes, - EXTENSION_SIZE, peerid) + EXTENSION_SIZE, serverid) self.wbp_class = wbp.__class__ # to create more of them self.allocated_size = wbp.get_allocated_size() self.blocksize = blocksize @@ -95,8 +95,8 @@ class PeerTracker: self.cancel_secret = bucket_cancel_secret def __repr__(self): - return ("" - % (idlib.shortnodeid_b2a(self.peerid), + return ("" + % (idlib.shortnodeid_b2a(self.serverid), si_b2a(self.storage_index)[:5])) def query(self, sharenums): @@ -123,7 +123,7 @@ class PeerTracker: self.num_segments, self.num_share_hashes, EXTENSION_SIZE, - self.peerid) + self.serverid) b[sharenum] = bp self.buckets.update(b) return (alreadygot, set(b.keys())) @@ -149,58 +149,59 @@ class PeerTracker: def str_shareloc(shnum, bucketwriter): return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),) -class Tahoe2PeerSelector(log.PrefixingLogMixin): +class Tahoe2ServerSelector(log.PrefixingLogMixin): def __init__(self, upload_id, logparent=None, upload_status=None): self.upload_id = upload_id self.query_count, self.good_query_count, self.bad_query_count = 0,0,0 - # Peers that are working normally, but full. + # Servers that are working normally, but full. self.full_count = 0 self.error_count = 0 - self.num_peers_contacted = 0 + self.num_servers_contacted = 0 self.last_failure_msg = None self._status = IUploadStatus(upload_status) log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id) self.log("starting", level=log.OPERATIONAL) def __repr__(self): - return "" % self.upload_id + return "" % self.upload_id def get_shareholders(self, storage_broker, secret_holder, storage_index, share_size, block_size, num_segments, total_shares, needed_shares, servers_of_happiness): """ - @return: (upload_servers, already_peers), where upload_servers is a set of - PeerTracker instances that have agreed to hold some shares - for us (the shareids are stashed inside the PeerTracker), - and already_peers is a dict mapping shnum to a set of peers - which claim to already have the share. + @return: (upload_servers, already_servers), where upload_servers is + a set of ServerTracker instances that have agreed to hold + some shares for us (the shareids are stashed inside the + ServerTracker), and already_servers is a dict mapping shnum + to a set of servers which claim to already have the share. """ if self._status: - self._status.set_status("Contacting Peers..") + self._status.set_status("Contacting Servers..") self.total_shares = total_shares self.servers_of_happiness = servers_of_happiness self.needed_shares = needed_shares self.homeless_shares = set(range(total_shares)) - self.contacted_peers = [] # peers worth asking again - self.contacted_peers2 = [] # peers that we have asked again + self.contacted_servers = [] # servers worth asking again + self.contacted_servers2 = [] # servers that we have asked again self._started_second_pass = False - self.use_peers = set() # PeerTrackers that have shares assigned to them - self.preexisting_shares = {} # shareid => set(peerids) holding shareid + self.use_servers = set() # ServerTrackers that have shares assigned + # to them + self.preexisting_shares = {} # shareid => set(serverids) holding shareid # We don't try to allocate shares to these servers, since they've said # that they're incapable of storing shares of the size that we'd want # to store. We keep them around because they may have existing shares # for this storage index, which we want to know about for accurate # servers_of_happiness accounting # (this is eventually a list, but it is initialized later) - self.readonly_peers = None - # These peers have shares -- any shares -- for our SI. We keep + self.readonly_servers = None + # These servers have shares -- any shares -- for our SI. We keep # track of these to write an error message with them later. - self.peers_with_shares = set() + self.servers_with_shares = set() # this needed_hashes computation should mirror # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree @@ -214,22 +215,22 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): num_share_hashes, EXTENSION_SIZE, None) allocated_size = wbp.get_allocated_size() - all_peers = [(s.get_serverid(), s.get_rref()) - for s in storage_broker.get_servers_for_psi(storage_index)] - if not all_peers: - raise NoServersError("client gave us zero peers") + all_servers = [(s.get_serverid(), s.get_rref()) + for s in storage_broker.get_servers_for_psi(storage_index)] + if not all_servers: + raise NoServersError("client gave us zero servers") - # filter the list of peers according to which ones can accomodate - # this request. This excludes older peers (which used a 4-byte size + # filter the list of servers according to which ones can accomodate + # this request. This excludes older servers (which used a 4-byte size # field) from getting large shares (for files larger than about # 12GiB). See #439 for details. - def _get_maxsize(peer): - (peerid, conn) = peer + def _get_maxsize(server): + (serverid, conn) = server v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] - writable_peers = [peer for peer in all_peers - if _get_maxsize(peer) >= allocated_size] - readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers) + writable_servers = [server for server in all_servers + if _get_maxsize(server) >= allocated_size] + readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers) # decide upon the renewal/cancel secrets, to include them in the # allocate_buckets query. @@ -240,61 +241,61 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): storage_index) file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, storage_index) - def _make_trackers(peers): - return [PeerTracker(peerid, conn, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - bucket_renewal_secret_hash(file_renewal_secret, - peerid), - bucket_cancel_secret_hash(file_cancel_secret, - peerid)) - for (peerid, conn) in peers] - self.uncontacted_peers = _make_trackers(writable_peers) - self.readonly_peers = _make_trackers(readonly_peers) - # We now ask peers that can't hold any new shares about existing + def _make_trackers(servers): + return [ServerTracker(serverid, conn, + share_size, block_size, + num_segments, num_share_hashes, + storage_index, + bucket_renewal_secret_hash(file_renewal_secret, + serverid), + bucket_cancel_secret_hash(file_cancel_secret, + serverid)) + for (serverid, conn) in servers] + self.uncontacted_servers = _make_trackers(writable_servers) + self.readonly_servers = _make_trackers(readonly_servers) + # We now ask servers that can't hold any new shares about existing # shares that they might have for our SI. Once this is done, we # start placing the shares that we haven't already accounted # for. ds = [] - if self._status and self.readonly_peers: - self._status.set_status("Contacting readonly peers to find " + if self._status and self.readonly_servers: + self._status.set_status("Contacting readonly servers to find " "any existing shares") - for peer in self.readonly_peers: - assert isinstance(peer, PeerTracker) - d = peer.ask_about_existing_shares() - d.addBoth(self._handle_existing_response, peer.peerid) + for server in self.readonly_servers: + assert isinstance(server, ServerTracker) + d = server.ask_about_existing_shares() + d.addBoth(self._handle_existing_response, server.serverid) ds.append(d) - self.num_peers_contacted += 1 + self.num_servers_contacted += 1 self.query_count += 1 - self.log("asking peer %s for any existing shares" % - (idlib.shortnodeid_b2a(peer.peerid),), + self.log("asking server %s for any existing shares" % + (idlib.shortnodeid_b2a(server.serverid),), level=log.NOISY) dl = defer.DeferredList(ds) dl.addCallback(lambda ign: self._loop()) return dl - def _handle_existing_response(self, res, peer): + def _handle_existing_response(self, res, server): """ I handle responses to the queries sent by - Tahoe2PeerSelector._existing_shares. + Tahoe2ServerSelector._existing_shares. """ if isinstance(res, failure.Failure): self.log("%s got error during existing shares check: %s" - % (idlib.shortnodeid_b2a(peer), res), + % (idlib.shortnodeid_b2a(server), res), level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 else: buckets = res if buckets: - self.peers_with_shares.add(peer) - self.log("response to get_buckets() from peer %s: alreadygot=%s" - % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))), + self.servers_with_shares.add(server) + self.log("response to get_buckets() from server %s: alreadygot=%s" + % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))), level=log.NOISY) for bucket in buckets: - self.preexisting_shares.setdefault(bucket, set()).add(peer) + self.preexisting_shares.setdefault(bucket, set()).add(server) self.homeless_shares.discard(bucket) self.full_count += 1 self.bad_query_count += 1 @@ -310,36 +311,37 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): len(self.homeless_shares))) return (msg + "want to place shares on at least %d servers such that " "any %d of them have enough shares to recover the file, " - "sent %d queries to %d peers, " + "sent %d queries to %d servers, " "%d queries placed some shares, %d placed none " "(of which %d placed none due to the server being" " full and %d placed none due to an error)" % (self.servers_of_happiness, self.needed_shares, - self.query_count, self.num_peers_contacted, + self.query_count, self.num_servers_contacted, self.good_query_count, self.bad_query_count, self.full_count, self.error_count)) def _loop(self): if not self.homeless_shares: - merged = merge_peers(self.preexisting_shares, self.use_peers) + merged = merge_peers(self.preexisting_shares, self.use_servers) effective_happiness = servers_of_happiness(merged) if self.servers_of_happiness <= effective_happiness: msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " - "self.use_peers: %s, self.preexisting_shares: %s") \ - % (self, self._get_progress_message(), - pretty_print_shnum_to_servers(merged), - [', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()]) - for p in self.use_peers], - pretty_print_shnum_to_servers(self.preexisting_shares)) + "self.use_servers: %s, self.preexisting_shares: %s") \ + % (self, self._get_progress_message(), + pretty_print_shnum_to_servers(merged), + [', '.join([str_shareloc(k,v) + for k,v in s.buckets.iteritems()]) + for s in self.use_servers], + pretty_print_shnum_to_servers(self.preexisting_shares)) self.log(msg, level=log.OPERATIONAL) - return (self.use_peers, self.preexisting_shares) + return (self.use_servers, self.preexisting_shares) else: # We're not okay right now, but maybe we can fix it by # redistributing some shares. In cases where one or two # servers has, before the upload, all or most of the # shares for a given SI, this can work by allowing _loop - # a chance to spread those out over the other peers, + # a chance to spread those out over the other servers, delta = self.servers_of_happiness - effective_happiness shares = shares_by_server(self.preexisting_shares) # Each server in shares maps to a set of shares stored on it. @@ -350,7 +352,7 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): shares_to_spread = sum([len(list(sharelist)) - 1 for (server, sharelist) in shares.items()]) - if delta <= len(self.uncontacted_peers) and \ + if delta <= len(self.uncontacted_servers) and \ shares_to_spread >= delta: items = shares.items() while len(self.homeless_shares) < delta: @@ -366,16 +368,16 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): if not self.preexisting_shares[share]: del self.preexisting_shares[share] items.append((server, sharelist)) - for writer in self.use_peers: + for writer in self.use_servers: writer.abort_some_buckets(self.homeless_shares) return self._loop() else: # Redistribution won't help us; fail. - peer_count = len(self.peers_with_shares) - failmsg = failure_message(peer_count, - self.needed_shares, - self.servers_of_happiness, - effective_happiness) + server_count = len(self.servers_with_shares) + failmsg = failure_message(server_count, + self.needed_shares, + self.servers_of_happiness, + effective_happiness) servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s" servmsg = servmsgtempl % ( self, @@ -386,63 +388,62 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): self.log(servmsg, level=log.INFREQUENT) return self._failed("%s (%s)" % (failmsg, self._get_progress_message())) - if self.uncontacted_peers: - peer = self.uncontacted_peers.pop(0) - # TODO: don't pre-convert all peerids to PeerTrackers - assert isinstance(peer, PeerTracker) + if self.uncontacted_servers: + server = self.uncontacted_servers.pop(0) + # TODO: don't pre-convert all serverids to ServerTrackers + assert isinstance(server, ServerTracker) shares_to_ask = set(sorted(self.homeless_shares)[:1]) self.homeless_shares -= shares_to_ask self.query_count += 1 - self.num_peers_contacted += 1 + self.num_servers_contacted += 1 if self._status: - self._status.set_status("Contacting Peers [%s] (first query)," + self._status.set_status("Contacting Servers [%s] (first query)," " %d shares left.." - % (idlib.shortnodeid_b2a(peer.peerid), + % (idlib.shortnodeid_b2a(server.serverid), len(self.homeless_shares))) - d = peer.query(shares_to_ask) - d.addBoth(self._got_response, peer, shares_to_ask, - self.contacted_peers) + d = server.query(shares_to_ask) + d.addBoth(self._got_response, server, shares_to_ask, + self.contacted_servers) return d - elif self.contacted_peers: - # ask a peer that we've already asked. + elif self.contacted_servers: + # ask a server that we've already asked. if not self._started_second_pass: self.log("starting second pass", level=log.NOISY) self._started_second_pass = True num_shares = mathutil.div_ceil(len(self.homeless_shares), - len(self.contacted_peers)) - peer = self.contacted_peers.pop(0) + len(self.contacted_servers)) + server = self.contacted_servers.pop(0) shares_to_ask = set(sorted(self.homeless_shares)[:num_shares]) self.homeless_shares -= shares_to_ask self.query_count += 1 if self._status: - self._status.set_status("Contacting Peers [%s] (second query)," + self._status.set_status("Contacting Servers [%s] (second query)," " %d shares left.." - % (idlib.shortnodeid_b2a(peer.peerid), + % (idlib.shortnodeid_b2a(server.serverid), len(self.homeless_shares))) - d = peer.query(shares_to_ask) - d.addBoth(self._got_response, peer, shares_to_ask, - self.contacted_peers2) + d = server.query(shares_to_ask) + d.addBoth(self._got_response, server, shares_to_ask, + self.contacted_servers2) return d - elif self.contacted_peers2: + elif self.contacted_servers2: # we've finished the second-or-later pass. Move all the remaining - # peers back into self.contacted_peers for the next pass. - self.contacted_peers.extend(self.contacted_peers2) - self.contacted_peers2[:] = [] + # servers back into self.contacted_servers for the next pass. + self.contacted_servers.extend(self.contacted_servers2) + self.contacted_servers2[:] = [] return self._loop() else: - # no more peers. If we haven't placed enough shares, we fail. - merged = merge_peers(self.preexisting_shares, self.use_peers) + # no more servers. If we haven't placed enough shares, we fail. + merged = merge_peers(self.preexisting_shares, self.use_servers) effective_happiness = servers_of_happiness(merged) if effective_happiness < self.servers_of_happiness: - msg = failure_message(len(self.peers_with_shares), + msg = failure_message(len(self.servers_with_shares), self.needed_shares, self.servers_of_happiness, effective_happiness) - msg = ("peer selection failed for %s: %s (%s)" % (self, - msg, - self._get_progress_message())) + msg = ("server selection failed for %s: %s (%s)" % + (self, msg, self._get_progress_message())) if self.last_failure_msg: msg += " (%s)" % (self.last_failure_msg,) self.log(msg, level=log.UNUSUAL) @@ -454,53 +455,53 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged))) self.log(msg, level=log.OPERATIONAL) - return (self.use_peers, self.preexisting_shares) + return (self.use_servers, self.preexisting_shares) - def _got_response(self, res, peer, shares_to_ask, put_peer_here): + def _got_response(self, res, server, shares_to_ask, put_server_here): if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. - self.log("%s got error during peer selection: %s" % (peer, res), + self.log("%s got error during server selection: %s" % (server, res), level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 self.homeless_shares |= shares_to_ask - if (self.uncontacted_peers - or self.contacted_peers - or self.contacted_peers2): + if (self.uncontacted_servers + or self.contacted_servers + or self.contacted_servers2): # there is still hope, so just loop pass else: - # No more peers, so this upload might fail (it depends upon + # No more servers, so this upload might fail (it depends upon # whether we've hit servers_of_happiness or not). Log the last - # failure we got: if a coding error causes all peers to fail + # failure we got: if a coding error causes all servers to fail # in the same way, this allows the common failure to be seen # by the uploader and should help with debugging - msg = ("last failure (from %s) was: %s" % (peer, res)) + msg = ("last failure (from %s) was: %s" % (server, res)) self.last_failure_msg = msg else: (alreadygot, allocated) = res - self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s" - % (idlib.shortnodeid_b2a(peer.peerid), + self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s" + % (idlib.shortnodeid_b2a(server.serverid), tuple(sorted(alreadygot)), tuple(sorted(allocated))), level=log.NOISY) progress = False for s in alreadygot: - self.preexisting_shares.setdefault(s, set()).add(peer.peerid) + self.preexisting_shares.setdefault(s, set()).add(server.serverid) if s in self.homeless_shares: self.homeless_shares.remove(s) progress = True elif s in shares_to_ask: progress = True - # the PeerTracker will remember which shares were allocated on + # the ServerTracker will remember which shares were allocated on # that peer. We just have to remember to use them. if allocated: - self.use_peers.add(peer) + self.use_servers.add(server) progress = True if allocated or alreadygot: - self.peers_with_shares.add(peer.peerid) + self.servers_with_shares.add(server.serverid) not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) @@ -517,11 +518,11 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): if still_homeless: # In networks with lots of space, this is very unusual and - # probably indicates an error. In networks with peers that + # probably indicates an error. In networks with servers that # are full, it is merely unusual. In networks that are very # full, it is common, and many uploads will fail. In most # cases, this is obviously not fatal, and we'll just use some - # other peers. + # other servers. # some shares are still homeless, keep trying to find them a # home. The ones that were rejected get first priority. @@ -531,7 +532,7 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): else: # if they *were* able to accept everything, they might be # willing to accept even more. - put_peer_here.append(peer) + put_server_here.append(server) # now loop return self._loop() @@ -539,15 +540,15 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): def _failed(self, msg): """ - I am called when peer selection fails. I first abort all of the + I am called when server selection fails. I first abort all of the remote buckets that I allocated during my unsuccessful attempt to place shares for this file. I then raise an UploadUnhappinessError with my msg argument. """ - for peer in self.use_peers: - assert isinstance(peer, PeerTracker) + for server in self.use_servers: + assert isinstance(server, ServerTracker) - peer.abort() + server.abort() raise UploadUnhappinessError(msg) @@ -825,10 +826,10 @@ class UploadStatus: self.results = value class CHKUploader: - peer_selector_class = Tahoe2PeerSelector + server_selector_class = Tahoe2ServerSelector def __init__(self, storage_broker, secret_holder): - # peer_selector needs storage_broker and secret_holder + # server_selector needs storage_broker and secret_holder self._storage_broker = storage_broker self._secret_holder = secret_holder self._log_number = self.log("CHKUploader starting", parent=None) @@ -841,7 +842,7 @@ class CHKUploader: self._upload_status.set_results(self._results) # locate_all_shareholders() will create the following attribute: - # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker + # self._server_trackers = {} # k: shnum, v: instance of ServerTracker def log(self, *args, **kwargs): if "parent" not in kwargs: @@ -892,7 +893,7 @@ class CHKUploader: return d def locate_all_shareholders(self, encoder, started): - peer_selection_started = now = time.time() + server_selection_started = now = time.time() self._storage_index_elapsed = now - started storage_broker = self._storage_broker secret_holder = self._secret_holder @@ -900,55 +901,58 @@ class CHKUploader: self._storage_index = storage_index upload_id = si_b2a(storage_index)[:5] self.log("using storage index %s" % upload_id) - peer_selector = self.peer_selector_class(upload_id, self._log_number, - self._upload_status) + server_selector = self.server_selector_class(upload_id, + self._log_number, + self._upload_status) share_size = encoder.get_param("share_size") block_size = encoder.get_param("block_size") num_segments = encoder.get_param("num_segments") k,desired,n = encoder.get_param("share_counts") - self._peer_selection_started = time.time() - d = peer_selector.get_shareholders(storage_broker, secret_holder, - storage_index, - share_size, block_size, - num_segments, n, k, desired) + self._server_selection_started = time.time() + d = server_selector.get_shareholders(storage_broker, secret_holder, + storage_index, + share_size, block_size, + num_segments, n, k, desired) def _done(res): - self._peer_selection_elapsed = time.time() - peer_selection_started + self._server_selection_elapsed = time.time() - server_selection_started return res d.addCallback(_done) return d - def set_shareholders(self, (upload_servers, already_peers), encoder): + def set_shareholders(self, (upload_servers, already_servers), encoder): """ - @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some - shares for us (the shareids are stashed inside the PeerTracker) - @paran already_peers: a dict mapping sharenum to a set of peerids - that claim to already have this share + @param upload_servers: a sequence of ServerTracker objects that + have agreed to hold some shares for us (the + shareids are stashed inside the ServerTracker) + @paran already_servers: a dict mapping sharenum to a set of serverids + that claim to already have this share """ - msgtempl = "set_shareholders; upload_servers is %s, already_peers is %s" - values = ([', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()]) - for p in upload_servers], already_peers) + msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s" + values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()]) + for s in upload_servers], already_servers) self.log(msgtempl % values, level=log.OPERATIONAL) # record already-present shares in self._results - self._results.preexisting_shares = len(already_peers) + self._results.preexisting_shares = len(already_servers) - self._peer_trackers = {} # k: shnum, v: instance of PeerTracker - for peer in upload_servers: - assert isinstance(peer, PeerTracker) + self._server_trackers = {} # k: shnum, v: instance of ServerTracker + for server in upload_servers: + assert isinstance(server, ServerTracker) buckets = {} - servermap = already_peers.copy() - for peer in upload_servers: - buckets.update(peer.buckets) - for shnum in peer.buckets: - self._peer_trackers[shnum] = peer - servermap.setdefault(shnum, set()).add(peer.peerid) - assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), \ + servermap = already_servers.copy() + for server in upload_servers: + buckets.update(server.buckets) + for shnum in server.buckets: + self._server_trackers[shnum] = server + servermap.setdefault(shnum, set()).add(server.serverid) + assert len(buckets) == sum([len(server.buckets) + for server in upload_servers]), \ "%s (%s) != %s (%s)" % ( len(buckets), buckets, - sum([len(peer.buckets) for peer in upload_servers]), - [(p.buckets, p.peerid) for p in upload_servers] + sum([len(server.buckets) for server in upload_servers]), + [(s.buckets, s.serverid) for s in upload_servers] ) encoder.set_shareholders(buckets, servermap) @@ -956,16 +960,16 @@ class CHKUploader: """ Returns a Deferred that will fire with the UploadResults instance. """ r = self._results for shnum in self._encoder.get_shares_placed(): - peer_tracker = self._peer_trackers[shnum] - peerid = peer_tracker.peerid - r.sharemap.add(shnum, peerid) - r.servermap.add(peerid, shnum) + server_tracker = self._server_trackers[shnum] + serverid = server_tracker.serverid + r.sharemap.add(shnum, serverid) + r.servermap.add(serverid, shnum) r.pushed_shares = len(self._encoder.get_shares_placed()) now = time.time() r.file_size = self._encoder.file_size r.timings["total"] = now - self._started r.timings["storage_index"] = self._storage_index_elapsed - r.timings["peer_selection"] = self._peer_selection_elapsed + r.timings["peer_selection"] = self._server_selection_elapsed r.timings.update(self._encoder.get_times()) r.uri_extension_data = self._encoder.get_uri_extension_data() r.verifycapstr = verifycap.to_string() diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index e7368604..03cb6a1e 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -193,12 +193,12 @@ class FakeClient: self.num_servers = num_servers if type(mode) is str: mode = dict([i,mode] for i in range(num_servers)) - peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) - for fakeid in range(self.num_servers) ] + servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) + for fakeid in range(self.num_servers) ] self.storage_broker = StorageFarmBroker(None, permute_peers=True) - for (serverid, rref) in peers: + for (serverid, rref) in servers: self.storage_broker.test_add_rref(serverid, rref) - self.last_peers = [p[1] for p in peers] + self.last_servers = [s[1] for s in servers] def log(self, *args, **kwargs): pass @@ -411,7 +411,7 @@ class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin): def test_first_error_all(self): self.make_node("first-fail") d = self.shouldFail(UploadUnhappinessError, "first_error_all", - "peer selection failed", + "server selection failed", upload_data, self.u, DATA) def _check((f,)): self.failUnlessIn("placed 0 shares out of 100 total", str(f.value)) @@ -443,7 +443,7 @@ class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin): def test_second_error_all(self): self.make_node("second-fail") d = self.shouldFail(UploadUnhappinessError, "second_error_all", - "peer selection failed", + "server selection failed", upload_data, self.u, DATA) def _check((f,)): self.failUnlessIn("placed 10 shares out of 100 total", str(f.value)) @@ -468,7 +468,7 @@ class FullServer(unittest.TestCase): d.addBoth(self._should_fail) return d -class PeerSelection(unittest.TestCase): +class ServerSelection(unittest.TestCase): def make_client(self, num_servers=50): self.node = FakeClient(mode="good", num_servers=num_servers) @@ -497,8 +497,8 @@ class PeerSelection(unittest.TestCase): self.node.DEFAULT_ENCODING_PARAMETERS = p def test_one_each(self): - # if we have 50 shares, and there are 50 peers, and they all accept a - # share, we should get exactly one share per peer + # if we have 50 shares, and there are 50 servers, and they all accept + # a share, we should get exactly one share per server self.make_client() data = self.get_data(SIZE_LARGE) @@ -507,35 +507,35 @@ class PeerSelection(unittest.TestCase): d.addCallback(extract_uri) d.addCallback(self._check_large, SIZE_LARGE) def _check(res): - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated self.failUnlessEqual(len(allocated), 1) - self.failUnlessEqual(p.queries, 1) + self.failUnlessEqual(s.queries, 1) d.addCallback(_check) return d def test_two_each(self): - # if we have 100 shares, and there are 50 peers, and they all accept - # all shares, we should get exactly two shares per peer + # if we have 100 shares, and there are 50 servers, and they all + # accept all shares, we should get exactly two shares per server self.make_client() data = self.get_data(SIZE_LARGE) - # if there are 50 peers, then happy needs to be <= 50 + # if there are 50 servers, then happy needs to be <= 50 self.set_encoding_parameters(50, 50, 100) d = upload_data(self.u, data) d.addCallback(extract_uri) d.addCallback(self._check_large, SIZE_LARGE) def _check(res): - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated self.failUnlessEqual(len(allocated), 2) - self.failUnlessEqual(p.queries, 2) + self.failUnlessEqual(s.queries, 2) d.addCallback(_check) return d def test_one_each_plus_one_extra(self): - # if we have 51 shares, and there are 50 peers, then one peer gets - # two shares and the rest get just one + # if we have 51 shares, and there are 50 servers, then one server + # gets two shares and the rest get just one self.make_client() data = self.get_data(SIZE_LARGE) @@ -546,38 +546,38 @@ class PeerSelection(unittest.TestCase): def _check(res): got_one = [] got_two = [] - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated self.failUnless(len(allocated) in (1,2), len(allocated)) if len(allocated) == 1: - self.failUnlessEqual(p.queries, 1) - got_one.append(p) + self.failUnlessEqual(s.queries, 1) + got_one.append(s) else: - self.failUnlessEqual(p.queries, 2) - got_two.append(p) + self.failUnlessEqual(s.queries, 2) + got_two.append(s) self.failUnlessEqual(len(got_one), 49) self.failUnlessEqual(len(got_two), 1) d.addCallback(_check) return d def test_four_each(self): - # if we have 200 shares, and there are 50 peers, then each peer gets - # 4 shares. The design goal is to accomplish this with only two - # queries per peer. + # if we have 200 shares, and there are 50 servers, then each server + # gets 4 shares. The design goal is to accomplish this with only two + # queries per server. self.make_client() data = self.get_data(SIZE_LARGE) - # if there are 50 peers, then happy should be no more than 50 if - # we want this to work. + # if there are 50 servers, then happy should be no more than 50 if we + # want this to work. self.set_encoding_parameters(100, 50, 200) d = upload_data(self.u, data) d.addCallback(extract_uri) d.addCallback(self._check_large, SIZE_LARGE) def _check(res): - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated self.failUnlessEqual(len(allocated), 4) - self.failUnlessEqual(p.queries, 2) + self.failUnlessEqual(s.queries, 2) d.addCallback(_check) return d @@ -593,8 +593,8 @@ class PeerSelection(unittest.TestCase): d.addCallback(self._check_large, SIZE_LARGE) def _check(res): counts = {} - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated counts[len(allocated)] = counts.get(len(allocated), 0) + 1 histogram = [counts.get(i, 0) for i in range(5)] self.failUnlessEqual(histogram, [0,0,0,2,1]) @@ -616,10 +616,10 @@ class PeerSelection(unittest.TestCase): d.addCallback(extract_uri) d.addCallback(self._check_large, SIZE_LARGE) def _check(res): - # we should have put one share each on the big peers, and zero - # shares on the small peers + # we should have put one share each on the big servers, and zero + # shares on the small servers total_allocated = 0 - for p in self.node.last_peers: + for p in self.node.last_servers: if p.mode == "good": self.failUnlessEqual(len(p.allocated), 1) elif p.mode == "small": @@ -750,8 +750,9 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, def _do_upload_with_broken_servers(self, servers_to_break): """ I act like a normal upload, but before I send the results of - Tahoe2PeerSelector to the Encoder, I break the first servers_to_break - PeerTrackers in the upload_servers part of the return result. + Tahoe2ServerSelector to the Encoder, I break the first + servers_to_break ServerTrackers in the upload_servers part of the + return result. """ assert self.g, "I tried to find a grid at self.g, but failed" broker = self.g.clients[0].storage_broker @@ -764,7 +765,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, encoder = encode.Encoder() encoder.set_encrypted_uploadable(uploadable) status = upload.UploadStatus() - selector = upload.Tahoe2PeerSelector("dglev", "test", status) + selector = upload.Tahoe2ServerSelector("dglev", "test", status) storage_index = encoder.get_param("storage_index") share_size = encoder.get_param("share_size") block_size = encoder.get_param("block_size") @@ -772,18 +773,18 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d = selector.get_shareholders(broker, sh, storage_index, share_size, block_size, num_segments, 10, 3, 4) - def _have_shareholders((upload_servers, already_peers)): + def _have_shareholders((upload_servers, already_servers)): assert servers_to_break <= len(upload_servers) for index in xrange(servers_to_break): server = list(upload_servers)[index] for share in server.buckets.keys(): server.buckets[share].abort() buckets = {} - servermap = already_peers.copy() - for peer in upload_servers: - buckets.update(peer.buckets) - for bucket in peer.buckets: - servermap.setdefault(bucket, set()).add(peer.peerid) + servermap = already_servers.copy() + for server in upload_servers: + buckets.update(server.buckets) + for bucket in server.buckets: + servermap.setdefault(bucket, set()).add(server.serverid) encoder.set_shareholders(buckets, servermap) d = encoder.start() return d @@ -1054,7 +1055,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # one share from our initial upload to each of these. # The counterintuitive ordering of the share numbers is to deal with # the permuting of these servers -- distributing the shares this - # way ensures that the Tahoe2PeerSelector sees them in the order + # way ensures that the Tahoe2ServerSelector sees them in the order # described below. d = self._setup_and_upload() d.addCallback(lambda ign: @@ -1069,7 +1070,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # server 2: share 0 # server 3: share 1 # We change the 'happy' parameter in the client to 4. - # The Tahoe2PeerSelector will see the peers permuted as: + # The Tahoe2ServerSelector will see the servers permuted as: # 2, 3, 1, 0 # Ideally, a reupload of our original data should work. def _reset_encoding_parameters(ign, happy=4): @@ -1084,17 +1085,17 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # This scenario is basically comment:53, but changed so that the - # Tahoe2PeerSelector sees the server with all of the shares before + # Tahoe2ServerSelector sees the server with all of the shares before # any of the other servers. # The layout is: # server 2: shares 0 - 9 # server 3: share 0 # server 1: share 1 # server 4: share 2 - # The Tahoe2PeerSelector sees the peers permuted as: + # The Tahoe2ServerSelector sees the servers permuted as: # 2, 3, 1, 4 # Note that server 0 has been replaced by server 4; this makes it - # easier to ensure that the last server seen by Tahoe2PeerSelector + # easier to ensure that the last server seen by Tahoe2ServerSelector # has only one share. d.addCallback(_change_basedir) d.addCallback(lambda ign: @@ -1124,7 +1125,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # Try the same thing, but with empty servers after the first one - # We want to make sure that Tahoe2PeerSelector will redistribute + # We want to make sure that Tahoe2ServerSelector will redistribute # shares as necessary, not simply discover an existing layout. # The layout is: # server 2: shares 0 - 9 @@ -1184,7 +1185,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, return d test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release." - def test_happiness_with_some_readonly_peers(self): + def test_happiness_with_some_readonly_servers(self): # Try the following layout # server 2: shares 0-9 # server 4: share 0, read-only @@ -1223,13 +1224,13 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, return d - def test_happiness_with_all_readonly_peers(self): + def test_happiness_with_all_readonly_servers(self): # server 3: share 1, read-only # server 1: share 2, read-only # server 2: shares 0-9, read-only # server 4: share 0, read-only # The idea with this test is to make sure that the survey of - # read-only peers doesn't undercount servers of happiness + # read-only servers doesn't undercount servers of happiness self.basedir = self.mktemp() d = self._setup_and_upload() d.addCallback(lambda ign: @@ -1268,7 +1269,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # the layout presented to it satisfies "servers_of_happiness" # until a failure occurs) # - # This test simulates an upload where servers break after peer + # This test simulates an upload where servers break after server # selection, but before they are written to. def _set_basedir(ign=None): self.basedir = self.mktemp() @@ -1283,7 +1284,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, self._add_server(server_number=5) d.addCallback(_do_server_setup) # remove the original server - # (necessary to ensure that the Tahoe2PeerSelector will distribute + # (necessary to ensure that the Tahoe2ServerSelector will distribute # all the shares) def _remove_server(ign): server = self.g.servers_by_number[0] @@ -1343,7 +1344,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, def test_merge_peers(self): # merge_peers merges a list of upload_servers and a dict of - # shareid -> peerid mappings. + # shareid -> serverid mappings. shares = { 1 : set(["server1"]), 2 : set(["server2"]), @@ -1354,12 +1355,12 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # if not provided with a upload_servers argument, it should just # return the first argument unchanged. self.failUnlessEqual(shares, merge_peers(shares, set([]))) - class FakePeerTracker: + class FakeServerTracker: pass trackers = [] for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]: - t = FakePeerTracker() - t.peerid = server + t = FakeServerTracker() + t.serverid = server t.buckets = [i] trackers.append(t) expected = { @@ -1386,8 +1387,8 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, expected = {} for (i, server) in [(i, "server%d" % i) for i in xrange(10)]: shares3[i] = set([server]) - t = FakePeerTracker() - t.peerid = server + t = FakeServerTracker() + t.serverid = server t.buckets = [i] trackers.append(t) expected[i] = set([server]) @@ -1403,7 +1404,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # value for given inputs. # servers_of_happiness expects a dict of - # shnum => set(peerids) as a preexisting shares argument. + # shnum => set(serverids) as a preexisting shares argument. test1 = { 1 : set(["server1"]), 2 : set(["server2"]), @@ -1417,22 +1418,22 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # should be 3 instead of 4. happy = servers_of_happiness(test1) self.failUnlessEqual(3, happy) - # The second argument of merge_peers should be a set of - # objects with peerid and buckets as attributes. In actual use, - # these will be PeerTracker instances, but for testing it is fine - # to make a FakePeerTracker whose job is to hold those instance - # variables to test that part. - class FakePeerTracker: + # The second argument of merge_peers should be a set of objects with + # serverid and buckets as attributes. In actual use, these will be + # ServerTracker instances, but for testing it is fine to make a + # FakeServerTracker whose job is to hold those instance variables to + # test that part. + class FakeServerTracker: pass trackers = [] for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]: - t = FakePeerTracker() - t.peerid = server + t = FakeServerTracker() + t.serverid = server t.buckets = [i] trackers.append(t) # Recall that test1 is a server layout with servers_of_happiness # = 3. Since there isn't any overlap between the shnum -> - # set([peerid]) correspondences in test1 and those in trackers, + # set([serverid]) correspondences in test1 and those in trackers, # the result here should be 7. test2 = merge_peers(test1, set(trackers)) happy = servers_of_happiness(test2) @@ -1440,8 +1441,8 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # Now add an overlapping server to trackers. This is redundant, # so it should not cause the previously reported happiness value # to change. - t = FakePeerTracker() - t.peerid = "server1" + t = FakeServerTracker() + t.serverid = "server1" t.buckets = [1] trackers.append(t) test2 = merge_peers(test1, set(trackers)) @@ -1459,17 +1460,17 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, 4 : set(['server4']), } trackers = [] - t = FakePeerTracker() - t.peerid = 'server5' + t = FakeServerTracker() + t.serverid = 'server5' t.buckets = [4] trackers.append(t) - t = FakePeerTracker() - t.peerid = 'server6' + t = FakeServerTracker() + t.serverid = 'server6' t.buckets = [3, 5] trackers.append(t) # The value returned by servers_of_happiness is the size # of a maximum matching in the bipartite graph that - # servers_of_happiness() makes between peerids and share + # servers_of_happiness() makes between serverids and share # numbers. It should find something like this: # (server 1, share 1) # (server 2, share 2) @@ -1527,7 +1528,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, sbs = shares_by_server(test1) self.failUnlessEqual(set([1, 2, 3]), sbs["server1"]) self.failUnlessEqual(set([4, 5]), sbs["server2"]) - # This should fail unless the peerid part of the mapping is a set + # This should fail unless the serverid part of the mapping is a set test2 = {1: "server1"} self.shouldFail(AssertionError, "test_shares_by_server", @@ -1543,7 +1544,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # server 2: empty # server 3: empty # server 4: empty - # The purpose of this test is to make sure that the peer selector + # The purpose of this test is to make sure that the server selector # knows about the shares on server 1, even though it is read-only. # It used to simply filter these out, which would cause the test # to fail when servers_of_happiness = 4. @@ -1574,7 +1575,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, def test_query_counting(self): - # If peer selection fails, Tahoe2PeerSelector prints out a lot + # If server selection fails, Tahoe2ServerSelector prints out a lot # of helpful diagnostic information, including query stats. # This test helps make sure that that information is accurate. self.basedir = self.mktemp() @@ -1597,7 +1598,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, c.upload, upload.Data("data" * 10000, convergence=""))) # Now try with some readonly servers. We want to make sure that - # the readonly peer share discovery phase is counted correctly. + # the readonly server share discovery phase is counted correctly. def _reset(ign): self.basedir = self.mktemp() self.g = None @@ -1668,13 +1669,13 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d.addCallback(lambda client: self.shouldFail(UploadUnhappinessError, "test_upper_limit_on_readonly_queries", - "sent 8 queries to 8 peers", + "sent 8 queries to 8 servers", client.upload, upload.Data('data' * 10000, convergence=""))) return d - def test_exception_messages_during_peer_selection(self): + def test_exception_messages_during_server_selection(self): # server 1: read-only, no shares # server 2: read-only, no shares # server 3: read-only, no shares @@ -1707,7 +1708,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, "total (10 homeless), want to place shares on at " "least 4 servers such that any 3 of them have " "enough shares to recover the file, " - "sent 5 queries to 5 peers, 0 queries placed " + "sent 5 queries to 5 servers, 0 queries placed " "some shares, 5 placed none " "(of which 5 placed none due to the server being " "full and 0 placed none due to an error)", @@ -1748,7 +1749,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, "total (10 homeless), want to place shares on at " "least 4 servers such that any 3 of them have " "enough shares to recover the file, " - "sent 5 queries to 5 peers, 0 queries placed " + "sent 5 queries to 5 servers, 0 queries placed " "some shares, 5 placed none " "(of which 4 placed none due to the server being " "full and 1 placed none due to an error)", @@ -2009,9 +2010,9 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, return d - def test_peer_selector_bucket_abort(self): - # If peer selection for an upload fails due to an unhappy - # layout, the peer selection process should abort the buckets it + def test_server_selector_bucket_abort(self): + # If server selection for an upload fails due to an unhappy + # layout, the server selection process should abort the buckets it # allocates before failing, so that the space can be re-used. self.basedir = self.mktemp() self.set_up_grid(num_servers=5) @@ -2024,7 +2025,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d = defer.succeed(None) d.addCallback(lambda ignored: self.shouldFail(UploadUnhappinessError, - "test_peer_selection_bucket_abort", + "test_server_selection_bucket_abort", "", client.upload, upload.Data("data" * 10000, convergence=""))) @@ -2079,7 +2080,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, return None # TODO: -# upload with exactly 75 peers (shares_of_happiness) +# upload with exactly 75 servers (shares_of_happiness) # have a download fail # cancel a download (need to implement more cancel stuff) diff --git a/src/allmydata/util/happinessutil.py b/src/allmydata/util/happinessutil.py index 8029addd..9a5d74b2 100644 --- a/src/allmydata/util/happinessutil.py +++ b/src/allmydata/util/happinessutil.py @@ -74,7 +74,7 @@ def merge_peers(servermap, upload_servers=None): for peer in upload_servers: for shnum in peer.buckets: - servermap.setdefault(shnum, set()).add(peer.peerid) + servermap.setdefault(shnum, set()).add(peer.serverid) return servermap def servers_of_happiness(sharemap):