From: Kevan Carstensen Date: Fri, 14 May 2010 00:49:17 +0000 (-0700) Subject: Fix up the behavior of #778, per reviewers' comments X-Git-Url: https://git.rkrishnan.org/vdrive/%22file://%22news.html/%22?a=commitdiff_plain;h=e225f573b9c3fb0e37eae05ecfbeada921eb4bba;p=tahoe-lafs%2Ftahoe-lafs.git Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case. --- diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py index f6be4b02..c4474486 100644 --- a/src/allmydata/immutable/encode.py +++ b/src/allmydata/immutable/encode.py @@ -7,7 +7,7 @@ from foolscap.api import fireEventually from allmydata import uri from allmydata.storage.server import si_b2a from allmydata.hashtree import HashTree -from allmydata.util import mathutil, hashutil, base32, log +from allmydata.util import mathutil, hashutil, base32, log, happinessutil from allmydata.util.assertutil import _assert, precondition from allmydata.codec import CRSEncoder from allmydata.interfaces import IEncoder, IStorageBucketWriter, \ @@ -198,6 +198,8 @@ class Encoder(object): assert IStorageBucketWriter.providedBy(landlords[k]) self.landlords = landlords.copy() assert isinstance(servermap, dict) + for v in servermap.itervalues(): + assert isinstance(v, set) self.servermap = servermap.copy() def start(self): @@ -484,26 +486,33 @@ class Encoder(object): level=log.UNUSUAL, failure=why) if shareid in self.landlords: self.landlords[shareid].abort() + peerid = self.landlords[shareid].get_peerid() + assert peerid del self.landlords[shareid] + self.servermap[shareid].remove(peerid) + if not self.servermap[shareid]: + del self.servermap[shareid] else: # even more UNUSUAL self.log("they weren't in our list of landlords", parent=ln, level=log.WEIRD, umid="TQGFRw") - del(self.servermap[shareid]) - servers_left = list(set(self.servermap.values())) - if len(servers_left) < self.servers_of_happiness: - msg = "lost too many servers during upload (still have %d, want %d): %s" % \ - (len(servers_left), - self.servers_of_happiness, why) + happiness = happinessutil.servers_of_happiness(self.servermap) + if happiness < self.servers_of_happiness: + peerids = set(happinessutil.shares_by_server(self.servermap).keys()) + msg = happinessutil.failure_message(len(peerids), + self.required_shares, + self.servers_of_happiness, + happiness) + msg = "%s: %s" % (msg, why) raise UploadUnhappinessError(msg) self.log("but we can still continue with %s shares, we'll be happy " - "with at least %s" % (len(servers_left), + "with at least %s" % (happiness, self.servers_of_happiness), parent=ln) def _gather_responses(self, dl): d = defer.DeferredList(dl, fireOnOneErrback=True) - def _eatNotEnoughSharesError(f): + def _eatUploadUnhappinessError(f): # all exceptions that occur while talking to a peer are handled # in _remove_shareholder. That might raise UploadUnhappinessError, # which will cause the DeferredList to errback but which should @@ -513,7 +522,7 @@ class Encoder(object): f.trap(UploadUnhappinessError) return None for d0 in dl: - d0.addErrback(_eatNotEnoughSharesError) + d0.addErrback(_eatUploadUnhappinessError) return d def finish_hashing(self): diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index 6ca53391..6e07da7b 100644 --- a/src/allmydata/immutable/layout.py +++ b/src/allmydata/immutable/layout.py @@ -242,6 +242,12 @@ class WriteBucketProxy: def abort(self): return self._rref.callRemoteOnly("abort") + + def get_peerid(self): + if self._nodeid: + return self._nodeid + return None + class WriteBucketProxy_v2(WriteBucketProxy): fieldsize = 8 fieldstruct = ">Q" diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index ca57cd91..4beb66f5 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -13,6 +13,9 @@ from allmydata import hashtree, uri from allmydata.storage.server import si_b2a from allmydata.immutable import encode from allmydata.util import base32, dictutil, idlib, log, mathutil +from allmydata.util.happinessutil import servers_of_happiness, \ + shares_by_server, merge_peers, \ + failure_message from allmydata.util.assertutil import precondition from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ @@ -113,10 +116,9 @@ class PeerTracker: d.addCallback(self._got_reply) return d - def query_allocated(self): - d = self._storageserver.callRemote("get_buckets", - self.storage_index) - return d + def ask_about_existing_shares(self): + return self._storageserver.callRemote("get_buckets", + self.storage_index) def _got_reply(self, (alreadygot, buckets)): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) @@ -132,52 +134,6 @@ class PeerTracker: self.buckets.update(b) return (alreadygot, set(b.keys())) -def servers_with_unique_shares(existing_shares, used_peers=None): - """ - I accept a dict of shareid -> peerid mappings (and optionally a list - of PeerTracker instances) and return a list of servers that have shares. - """ - servers = [] - existing_shares = existing_shares.copy() - if used_peers: - peerdict = {} - for peer in used_peers: - peerdict.update(dict([(i, peer.peerid) for i in peer.buckets])) - for k in peerdict.keys(): - if existing_shares.has_key(k): - # Prevent overcounting; favor the bucket, and not the - # prexisting share. - del(existing_shares[k]) - peers = list(used_peers.copy()) - # We do this because the preexisting shares list goes by peerid. - peers = [x.peerid for x in peers] - servers.extend(peers) - servers.extend(existing_shares.values()) - return list(set(servers)) - -def shares_by_server(existing_shares): - """ - I accept a dict of shareid -> peerid mappings, and return a dict - of peerid -> shareid mappings - """ - servers = {} - for server in set(existing_shares.values()): - servers[server] = set([x for x in existing_shares.keys() - if existing_shares[x] == server]) - return servers - -def should_add_server(existing_shares, server, bucket): - """ - I tell my caller whether the servers_of_happiness number will be - increased or decreased if a particular server is added as the peer - already holding a particular share. I take a dictionary, a peerid, - and a bucket as arguments, and return a boolean. - """ - old_size = len(servers_with_unique_shares(existing_shares)) - new_candidate = existing_shares.copy() - new_candidate[bucket] = server - new_size = len(servers_with_unique_shares(new_candidate)) - return old_size < new_size class Tahoe2PeerSelector: @@ -203,8 +159,8 @@ class Tahoe2PeerSelector: @return: (used_peers, already_peers), where used_peers is a set of PeerTracker instances that have agreed to hold some shares for us (the shnum is stashed inside the PeerTracker), - and already_peers is a dict mapping shnum to a peer - which claims to already have the share. + and already_peers is a dict mapping shnum to a set of peers + which claim to already have the share. """ if self._status: @@ -215,25 +171,21 @@ class Tahoe2PeerSelector: self.needed_shares = needed_shares self.homeless_shares = range(total_shares) - # self.uncontacted_peers = list() # peers we haven't asked yet self.contacted_peers = [] # peers worth asking again self.contacted_peers2 = [] # peers that we have asked again self._started_second_pass = False self.use_peers = set() # PeerTrackers that have shares assigned to them - self.preexisting_shares = {} # sharenum -> peerid holding the share - # We don't try to allocate shares to these servers, since they've - # said that they're incapable of storing shares of the size that - # we'd want to store. We keep them around because they may have - # existing shares for this storage index, which we want to know - # about for accurate servers_of_happiness accounting - self.readonly_peers = [] - # These peers have shares -- any shares -- for our SI. We keep track - # of these to write an error message with them later. - self.peers_with_shares = [] - - peers = storage_broker.get_servers_for_index(storage_index) - if not peers: - raise NoServersError("client gave us zero peers") + self.preexisting_shares = {} # shareid => set(peerids) holding shareid + # We don't try to allocate shares to these servers, since they've said + # that they're incapable of storing shares of the size that we'd want + # to store. We keep them around because they may have existing shares + # for this storage index, which we want to know about for accurate + # servers_of_happiness accounting + # (this is eventually a list, but it is initialized later) + self.readonly_peers = None + # These peers have shares -- any shares -- for our SI. We keep + # track of these to write an error message with them later. + self.peers_with_shares = set() # this needed_hashes computation should mirror # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree @@ -247,6 +199,9 @@ class Tahoe2PeerSelector: num_share_hashes, EXTENSION_SIZE, None) allocated_size = wbp.get_allocated_size() + all_peers = storage_broker.get_servers_for_index(storage_index) + if not all_peers: + raise NoServersError("client gave us zero peers") # filter the list of peers according to which ones can accomodate # this request. This excludes older peers (which used a 4-byte size @@ -256,10 +211,9 @@ class Tahoe2PeerSelector: (peerid, conn) = peer v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] - new_peers = [peer for peer in peers - if _get_maxsize(peer) >= allocated_size] - old_peers = list(set(peers).difference(set(new_peers))) - peers = new_peers + writable_peers = [peer for peer in all_peers + if _get_maxsize(peer) >= allocated_size] + readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers) # decide upon the renewal/cancel secrets, to include them in the # allocate_buckets query. @@ -271,41 +225,46 @@ class Tahoe2PeerSelector: file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, storage_index) def _make_trackers(peers): - return [ PeerTracker(peerid, conn, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - bucket_renewal_secret_hash(file_renewal_secret, - peerid), - bucket_cancel_secret_hash(file_cancel_secret, - peerid)) + return [PeerTracker(peerid, conn, + share_size, block_size, + num_segments, num_share_hashes, + storage_index, + bucket_renewal_secret_hash(file_renewal_secret, + peerid), + bucket_cancel_secret_hash(file_cancel_secret, + peerid)) for (peerid, conn) in peers] - self.uncontacted_peers = _make_trackers(peers) - self.readonly_peers = _make_trackers(old_peers) - # Talk to the readonly servers to get an idea of what servers - # have what shares (if any) for this storage index - d = defer.maybeDeferred(self._existing_shares) - d.addCallback(lambda ign: self._loop()) - return d - - def _existing_shares(self): - if self.readonly_peers: - peer = self.readonly_peers.pop() + self.uncontacted_peers = _make_trackers(writable_peers) + self.readonly_peers = _make_trackers(readonly_peers) + # We now ask peers that can't hold any new shares about existing + # shares that they might have for our SI. Once this is done, we + # start placing the shares that we haven't already accounted + # for. + ds = [] + if self._status and self.readonly_peers: + self._status.set_status("Contacting readonly peers to find " + "any existing shares") + for peer in self.readonly_peers: assert isinstance(peer, PeerTracker) - d = peer.query_allocated() + d = peer.ask_about_existing_shares() d.addBoth(self._handle_existing_response, peer.peerid) + ds.append(d) self.num_peers_contacted += 1 self.query_count += 1 - log.msg("asking peer %s for any existing shares for upload id %s" + log.msg("asking peer %s for any existing shares for " + "upload id %s" % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id), level=log.NOISY, parent=self._log_parent) - if self._status: - self._status.set_status("Contacting Peer %s to find " - "any existing shares" - % idlib.shortnodeid_b2a(peer.peerid)) - return d + dl = defer.DeferredList(ds) + dl.addCallback(lambda ign: self._loop()) + return dl + def _handle_existing_response(self, res, peer): + """ + I handle responses to the queries sent by + Tahoe2PeerSelector._existing_shares. + """ if isinstance(res, failure.Failure): log.msg("%s got error during existing shares check: %s" % (idlib.shortnodeid_b2a(peer), res), @@ -315,18 +274,17 @@ class Tahoe2PeerSelector: else: buckets = res if buckets: - self.peers_with_shares.append(peer) + self.peers_with_shares.add(peer) log.msg("response from peer %s: alreadygot=%s" % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))), level=log.NOISY, parent=self._log_parent) for bucket in buckets: - if should_add_server(self.preexisting_shares, peer, bucket): - self.preexisting_shares[bucket] = peer - if self.homeless_shares and bucket in self.homeless_shares: - self.homeless_shares.remove(bucket) + self.preexisting_shares.setdefault(bucket, set()).add(peer) + if self.homeless_shares and bucket in self.homeless_shares: + self.homeless_shares.remove(bucket) self.full_count += 1 self.bad_query_count += 1 - return self._existing_shares() + def _get_progress_message(self): if not self.homeless_shares: @@ -350,16 +308,20 @@ class Tahoe2PeerSelector: def _loop(self): if not self.homeless_shares: - effective_happiness = servers_with_unique_shares( - self.preexisting_shares, - self.use_peers) - if self.servers_of_happiness <= len(effective_happiness): + merged = merge_peers(self.preexisting_shares, self.use_peers) + effective_happiness = servers_of_happiness(merged) + if self.servers_of_happiness <= effective_happiness: msg = ("peer selection successful for %s: %s" % (self, self._get_progress_message())) log.msg(msg, parent=self._log_parent) return (self.use_peers, self.preexisting_shares) else: - delta = self.servers_of_happiness - len(effective_happiness) + # We're not okay right now, but maybe we can fix it by + # redistributing some shares. In cases where one or two + # servers has, before the upload, all or most of the + # shares for a given SI, this can work by allowing _loop + # a chance to spread those out over the other peers, + delta = self.servers_of_happiness - effective_happiness shares = shares_by_server(self.preexisting_shares) # Each server in shares maps to a set of shares stored on it. # Since we want to keep at least one share on each server @@ -371,60 +333,32 @@ class Tahoe2PeerSelector: in shares.items()]) if delta <= len(self.uncontacted_peers) and \ shares_to_spread >= delta: - # Loop through the allocated shares, removing - # one from each server that has more than one and putting - # it back into self.homeless_shares until we've done - # this delta times. items = shares.items() while len(self.homeless_shares) < delta: - servernum, sharelist = items.pop() + # Loop through the allocated shares, removing + # one from each server that has more than one + # and putting it back into self.homeless_shares + # until we've done this delta times. + server, sharelist = items.pop() if len(sharelist) > 1: share = sharelist.pop() self.homeless_shares.append(share) - del(self.preexisting_shares[share]) - items.append((servernum, sharelist)) + self.preexisting_shares[share].remove(server) + if not self.preexisting_shares[share]: + del self.preexisting_shares[share] + items.append((server, sharelist)) return self._loop() else: - peer_count = len(list(set(self.peers_with_shares))) + # Redistribution won't help us; fail. + peer_count = len(self.peers_with_shares) # If peer_count < needed_shares, then the second error # message is nonsensical, so we use this one. - if peer_count < self.needed_shares: - msg = ("shares could only be placed or found on %d " - "server(s). " - "We were asked to place shares on at least %d " - "server(s) such that any %d of them have " - "enough shares to recover the file." % - (peer_count, - self.servers_of_happiness, - self.needed_shares)) - # Otherwise, if we've placed on at least needed_shares - # peers, but there isn't an x-happy subset of those peers - # for x < needed_shares, we use this error message. - elif len(effective_happiness) < self.needed_shares: - msg = ("shares could be placed or found on %d " - "server(s), but they are not spread out evenly " - "enough to ensure that any %d of these servers " - "would have enough shares to recover the file. " - "We were asked to place " - "shares on at least %d servers such that any " - "%d of them have enough shares to recover the " - "file." % - (peer_count, - self.needed_shares, - self.servers_of_happiness, - self.needed_shares)) - # Otherwise, if there is an x-happy subset of peers where - # x >= needed_shares, but x < shares_of_happiness, then - # we use this message. - else: - msg = ("shares could only be placed on %d server(s) " - "such that any %d of them have enough shares " - "to recover the file, but we were asked to use " - "at least %d such servers." % - (len(effective_happiness), - self.needed_shares, - self.servers_of_happiness)) - raise UploadUnhappinessError(msg) + msg = failure_message(peer_count, + self.needed_shares, + self.servers_of_happiness, + effective_happiness) + raise UploadUnhappinessError("%s (%s)" % (msg, + self._get_progress_message())) if self.uncontacted_peers: peer = self.uncontacted_peers.pop(0) @@ -473,11 +407,15 @@ class Tahoe2PeerSelector: else: # no more peers. If we haven't placed enough shares, we fail. placed_shares = self.total_shares - len(self.homeless_shares) - effective_happiness = servers_with_unique_shares( - self.preexisting_shares, - self.use_peers) - if len(effective_happiness) < self.servers_of_happiness: - msg = ("peer selection failed for %s: %s" % (self, + merged = merge_peers(self.preexisting_shares, self.use_peers) + effective_happiness = servers_of_happiness(merged) + if effective_happiness < self.servers_of_happiness: + msg = failure_message(len(self.peers_with_shares), + self.needed_shares, + self.servers_of_happiness, + effective_happiness) + msg = ("peer selection failed for %s: %s (%s)" % (self, + msg, self._get_progress_message())) if self.last_failure_msg: msg += " (%s)" % (self.last_failure_msg,) @@ -519,11 +457,12 @@ class Tahoe2PeerSelector: level=log.NOISY, parent=self._log_parent) progress = False for s in alreadygot: - if should_add_server(self.preexisting_shares, - peer.peerid, s): - self.preexisting_shares[s] = peer.peerid - if s in self.homeless_shares: - self.homeless_shares.remove(s) + self.preexisting_shares.setdefault(s, set()).add(peer.peerid) + if s in self.homeless_shares: + self.homeless_shares.remove(s) + progress = True + elif s in shares_to_ask: + progress = True # the PeerTracker will remember which shares were allocated on # that peer. We just have to remember to use them. @@ -532,14 +471,16 @@ class Tahoe2PeerSelector: progress = True if allocated or alreadygot: - self.peers_with_shares.append(peer.peerid) + self.peers_with_shares.add(peer.peerid) not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) if progress: - # they accepted or already had at least one share, so - # progress has been made + # They accepted at least one of the shares that we asked + # them to accept, or they had a share that we didn't ask + # them to accept but that we hadn't placed yet, so this + # was a productive query self.good_query_count += 1 else: self.bad_query_count += 1 @@ -938,8 +879,8 @@ class CHKUploader: def set_shareholders(self, (used_peers, already_peers), encoder): """ @param used_peers: a sequence of PeerTracker objects - @paran already_peers: a dict mapping sharenum to a peerid that - claims to already have this share + @paran already_peers: a dict mapping sharenum to a set of peerids + that claim to already have this share """ self.log("_send_shares, used_peers is %s" % (used_peers,)) # record already-present shares in self._results @@ -954,7 +895,7 @@ class CHKUploader: buckets.update(peer.buckets) for shnum in peer.buckets: self._peer_trackers[shnum] = peer - servermap[shnum] = peer.peerid + servermap.setdefault(shnum, set()).add(peer.peerid) assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]) encoder.set_shareholders(buckets, servermap) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index c2231e42..f325bb1f 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1345,7 +1345,8 @@ class IEncoder(Interface): must be a dictionary that maps share number (an integer ranging from 0 to n-1) to an instance that provides IStorageBucketWriter. 'servermap' is a dictionary that maps share number (as defined above) - to a peerid. This must be performed before start() can be called.""" + to a set of peerids. This must be performed before start() can be + called.""" def start(): """Begin the encode/upload process. This involves reading encrypted diff --git a/src/allmydata/util/happinessutil.py b/src/allmydata/util/happinessutil.py new file mode 100644 index 00000000..4c711290 --- /dev/null +++ b/src/allmydata/util/happinessutil.py @@ -0,0 +1,299 @@ +""" +I contain utilities useful for calculating servers_of_happiness, and for +reporting it in messages +""" + +def failure_message(peer_count, k, happy, effective_happy): + # If peer_count < needed_shares, this error message makes more + # sense than any of the others, so use it. + if peer_count < k: + msg = ("shares could be placed or found on only %d " + "server(s). " + "We were asked to place shares on at least %d " + "server(s) such that any %d of them have " + "enough shares to recover the file." % + (peer_count, happy, k)) + # Otherwise, if we've placed on at least needed_shares + # peers, but there isn't an x-happy subset of those peers + # for x >= needed_shares, we use this error message. + elif effective_happy < k: + msg = ("shares could be placed or found on %d " + "server(s), but they are not spread out evenly " + "enough to ensure that any %d of these servers " + "would have enough shares to recover the file. " + "We were asked to place " + "shares on at least %d servers such that any " + "%d of them have enough shares to recover the " + "file." % + (peer_count, k, happy, k)) + # Otherwise, if there is an x-happy subset of peers where + # x >= needed_shares, but x < servers_of_happiness, then + # we use this message. + else: + msg = ("shares could be placed on only %d server(s) " + "such that any %d of them have enough shares " + "to recover the file, but we were asked to " + "place shares on at least %d such servers." % + (effective_happy, k, happy)) + return msg + + +def shares_by_server(servermap): + """ + I accept a dict of shareid -> set(peerid) mappings, and return a + dict of peerid -> set(shareid) mappings. My argument is a dictionary + with sets of peers, indexed by shares, and I transform that into a + dictionary of sets of shares, indexed by peerids. + """ + ret = {} + for shareid, peers in servermap.iteritems(): + assert isinstance(peers, set) + for peerid in peers: + ret.setdefault(peerid, set()).add(shareid) + return ret + +def merge_peers(servermap, used_peers=None): + """ + I accept a dict of shareid -> set(peerid) mappings, and optionally a + set of PeerTrackers. If no set of PeerTrackers is provided, I return + my first argument unmodified. Otherwise, I update a copy of my first + argument to include the shareid -> peerid mappings implied in the + set of PeerTrackers, returning the resulting dict. + """ + if not used_peers: + return servermap + + assert(isinstance(servermap, dict)) + assert(isinstance(used_peers, set)) + + # Since we mutate servermap, and are called outside of a + # context where it is okay to do that, make a copy of servermap and + # work with it. + servermap = servermap.copy() + for peer in used_peers: + for shnum in peer.buckets: + servermap.setdefault(shnum, set()).add(peer.peerid) + return servermap + +def servers_of_happiness(sharemap): + """ + I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I + return the 'servers_of_happiness' number that sharemap results in. + + To calculate the 'servers_of_happiness' number for the sharemap, I + construct a bipartite graph with servers in one partition of vertices + and shares in the other, and with an edge between a server s and a share t + if s is to store t. I then compute the size of a maximum matching in + the resulting graph; this is then returned as the 'servers_of_happiness' + for my arguments. + + For example, consider the following layout: + + server 1: shares 1, 2, 3, 4 + server 2: share 6 + server 3: share 3 + server 4: share 4 + server 5: share 2 + + From this, we can construct the following graph: + + L = {server 1, server 2, server 3, server 4, server 5} + R = {share 1, share 2, share 3, share 4, share 6} + V = L U R + E = {(server 1, share 1), (server 1, share 2), (server 1, share 3), + (server 1, share 4), (server 2, share 6), (server 3, share 3), + (server 4, share 4), (server 5, share 2)} + G = (V, E) + + Note that G is bipartite since every edge in e has one endpoint in L + and one endpoint in R. + + A matching in a graph G is a subset M of E such that, for any vertex + v in V, v is incident to at most one edge of M. A maximum matching + in G is a matching that is no smaller than any other matching. For + this graph, a matching of cardinality 5 is: + + M = {(server 1, share 1), (server 2, share 6), + (server 3, share 3), (server 4, share 4), + (server 5, share 2)} + + Since G is bipartite, and since |L| = 5, we cannot have an M' such + that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and + as long as k <= 5, we can see that the layout above has + servers_of_happiness = 5, which matches the results here. + """ + if sharemap == {}: + return 0 + sharemap = shares_by_server(sharemap) + graph = flow_network_for(sharemap) + # This is an implementation of the Ford-Fulkerson method for finding + # a maximum flow in a flow network applied to a bipartite graph. + # Specifically, it is the Edmonds-Karp algorithm, since it uses a + # BFS to find the shortest augmenting path at each iteration, if one + # exists. + # + # The implementation here is an adapation of an algorithm described in + # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. + dim = len(graph) + flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] + residual_graph, residual_function = residual_network(graph, flow_function) + while augmenting_path_for(residual_graph): + path = augmenting_path_for(residual_graph) + # Delta is the largest amount that we can increase flow across + # all of the edges in path. Because of the way that the residual + # function is constructed, f[u][v] for a particular edge (u, v) + # is the amount of unused capacity on that edge. Taking the + # minimum of a list of those values for each edge in the + # augmenting path gives us our delta. + delta = min(map(lambda (u, v): residual_function[u][v], path)) + for (u, v) in path: + flow_function[u][v] += delta + flow_function[v][u] -= delta + residual_graph, residual_function = residual_network(graph, + flow_function) + num_servers = len(sharemap) + # The value of a flow is the total flow out of the source vertex + # (vertex 0, in our graph). We could just as well sum across all of + # f[0], but we know that vertex 0 only has edges to the servers in + # our graph, so we can stop after summing flow across those. The + # value of a flow computed in this way is the size of a maximum + # matching on the bipartite graph described above. + return sum([flow_function[0][v] for v in xrange(1, num_servers+1)]) + +def flow_network_for(sharemap): + """ + I take my argument, a dict of peerid -> set(shareid) mappings, and + turn it into a flow network suitable for use with Edmonds-Karp. I + then return the adjacency list representation of that network. + + Specifically, I build G = (V, E), where: + V = { peerid in sharemap } U { shareid in sharemap } U {s, t} + E = {(s, peerid) for each peerid} + U {(peerid, shareid) if peerid is to store shareid } + U {(shareid, t) for each shareid} + + s and t will be source and sink nodes when my caller starts treating + the graph I return like a flow network. Without s and t, the + returned graph is bipartite. + """ + # Servers don't have integral identifiers, and we can't make any + # assumptions about the way shares are indexed -- it's possible that + # there are missing shares, for example. So before making a graph, + # we re-index so that all of our vertices have integral indices, and + # that there aren't any holes. We start indexing at 1, so that we + # can add a source node at index 0. + sharemap, num_shares = reindex(sharemap, base_index=1) + num_servers = len(sharemap) + graph = [] # index -> [index], an adjacency list + # Add an entry at the top (index 0) that has an edge to every server + # in sharemap + graph.append(sharemap.keys()) + # For each server, add an entry that has an edge to every share that it + # contains (or will contain). + for k in sharemap: + graph.append(sharemap[k]) + # For each share, add an entry that has an edge to the sink. + sink_num = num_servers + num_shares + 1 + for i in xrange(num_shares): + graph.append([sink_num]) + # Add an empty entry for the sink, which has no outbound edges. + graph.append([]) + return graph + +def reindex(sharemap, base_index): + """ + Given sharemap, I map peerids and shareids to integers that don't + conflict with each other, so they're useful as indices in a graph. I + return a sharemap that is reindexed appropriately, and also the + number of distinct shares in the resulting sharemap as a convenience + for my caller. base_index tells me where to start indexing. + """ + shares = {} # shareid -> vertex index + num = base_index + ret = {} # peerid -> [shareid], a reindexed sharemap. + # Number the servers first + for k in sharemap: + ret[num] = sharemap[k] + num += 1 + # Number the shares + for k in ret: + for shnum in ret[k]: + if not shares.has_key(shnum): + shares[shnum] = num + num += 1 + ret[k] = map(lambda x: shares[x], ret[k]) + return (ret, len(shares)) + +def residual_network(graph, f): + """ + I return the residual network and residual capacity function of the + flow network represented by my graph and f arguments. graph is a + flow network in adjacency-list form, and f is a flow in graph. + """ + new_graph = [[] for i in xrange(len(graph))] + cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))] + for i in xrange(len(graph)): + for v in graph[i]: + if f[i][v] == 1: + # We add an edge (v, i) with cf[v,i] = 1. This means + # that we can remove 1 unit of flow from the edge (i, v) + new_graph[v].append(i) + cf[v][i] = 1 + cf[i][v] = -1 + else: + # We add the edge (i, v), since we're not using it right + # now. + new_graph[i].append(v) + cf[i][v] = 1 + cf[v][i] = -1 + return (new_graph, cf) + +def augmenting_path_for(graph): + """ + I return an augmenting path, if there is one, from the source node + to the sink node in the flow network represented by my graph argument. + If there is no augmenting path, I return False. I assume that the + source node is at index 0 of graph, and the sink node is at the last + index. I also assume that graph is a flow network in adjacency list + form. + """ + bfs_tree = bfs(graph, 0) + if bfs_tree[len(graph) - 1]: + n = len(graph) - 1 + path = [] # [(u, v)], where u and v are vertices in the graph + while n != 0: + path.insert(0, (bfs_tree[n], n)) + n = bfs_tree[n] + return path + return False + +def bfs(graph, s): + """ + Perform a BFS on graph starting at s, where graph is a graph in + adjacency list form, and s is a node in graph. I return the + predecessor table that the BFS generates. + """ + # This is an adaptation of the BFS described in "Introduction to + # Algorithms", Cormen et al, 2nd ed., p. 532. + # WHITE vertices are those that we haven't seen or explored yet. + WHITE = 0 + # GRAY vertices are those we have seen, but haven't explored yet + GRAY = 1 + # BLACK vertices are those we have seen and explored + BLACK = 2 + color = [WHITE for i in xrange(len(graph))] + predecessor = [None for i in xrange(len(graph))] + distance = [-1 for i in xrange(len(graph))] + queue = [s] # vertices that we haven't explored yet. + color[s] = GRAY + distance[s] = 0 + while queue: + n = queue.pop(0) + for v in graph[n]: + if color[v] == WHITE: + color[v] = GRAY + distance[v] = distance[n] + 1 + predecessor[v] = n + queue.append(v) + color[n] = BLACK + return predecessor