]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Fix up the behavior of #778, per reviewers' comments
authorKevan Carstensen <kevan@isnotajoke.com>
Fri, 14 May 2010 00:49:17 +0000 (17:49 -0700)
committerKevan Carstensen <kevan@isnotajoke.com>
Fri, 14 May 2010 00:49:17 +0000 (17:49 -0700)
  - Make some important utility functions clearer and more thoroughly
    documented.
  - Assert in upload.servers_of_happiness that the buckets attributes
    of PeerTrackers passed to it are mutually disjoint.
  - Get rid of some silly non-Pythonisms that I didn't see when I first
    wrote these patches.
  - Make sure that should_add_server returns true when queried about a
    shnum that it doesn't know about yet.
  - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set
    of peerids, alter dependencies to deal with that.
  - Remove upload.should_add_servers, because it is no longer necessary
  - Move upload.shares_of_happiness and upload.shares_by_server to a utility
    file.
  - Change some points in Tahoe2PeerSelector.
  - Compute servers_of_happiness using a bipartite matching algorithm that
    we know is optimal instead of an ad-hoc greedy algorithm that isn't.
  - Change servers_of_happiness to just take a sharemap as an argument,
    change its callers to merge existing_shares and used_peers before
    calling it.
  - Change an error message in the encoder to be more appropriate for
    servers of happiness.
  - Clarify the wording of an error message in immutable/upload.py
  - Refactor a happiness failure message to happinessutil.py, and make
    immutable/upload.py and immutable/encode.py use it.
  - Move the word "only" as far to the right as possible in failure
    messages.
  - Use a better definition of progress during peer selection.
  - Do read-only peer share detection queries in parallel, not sequentially.
  - Clean up logging semantics; print the query statistics whenever an
    upload is unsuccessful, not just in one case.

src/allmydata/immutable/encode.py
src/allmydata/immutable/layout.py
src/allmydata/immutable/upload.py
src/allmydata/interfaces.py
src/allmydata/util/happinessutil.py [new file with mode: 0644]

index f6be4b023554ddc3e7977c9f61852ca2c3d603fa..c44744862798bd035b937e2dc7a394940ffb34d7 100644 (file)
@@ -7,7 +7,7 @@ from foolscap.api import fireEventually
 from allmydata import uri
 from allmydata.storage.server import si_b2a
 from allmydata.hashtree import HashTree
-from allmydata.util import mathutil, hashutil, base32, log
+from allmydata.util import mathutil, hashutil, base32, log, happinessutil
 from allmydata.util.assertutil import _assert, precondition
 from allmydata.codec import CRSEncoder
 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
@@ -198,6 +198,8 @@ class Encoder(object):
             assert IStorageBucketWriter.providedBy(landlords[k])
         self.landlords = landlords.copy()
         assert isinstance(servermap, dict)
+        for v in servermap.itervalues():
+            assert isinstance(v, set)
         self.servermap = servermap.copy()
 
     def start(self):
@@ -484,26 +486,33 @@ class Encoder(object):
                       level=log.UNUSUAL, failure=why)
         if shareid in self.landlords:
             self.landlords[shareid].abort()
+            peerid = self.landlords[shareid].get_peerid()
+            assert peerid
             del self.landlords[shareid]
+            self.servermap[shareid].remove(peerid)
+            if not self.servermap[shareid]:
+                del self.servermap[shareid]
         else:
             # even more UNUSUAL
             self.log("they weren't in our list of landlords", parent=ln,
                      level=log.WEIRD, umid="TQGFRw")
-        del(self.servermap[shareid])
-        servers_left = list(set(self.servermap.values()))
-        if len(servers_left) < self.servers_of_happiness:
-            msg = "lost too many servers during upload (still have %d, want %d): %s" % \
-                  (len(servers_left),
-                   self.servers_of_happiness, why)
+        happiness = happinessutil.servers_of_happiness(self.servermap)
+        if happiness < self.servers_of_happiness:
+            peerids = set(happinessutil.shares_by_server(self.servermap).keys())
+            msg = happinessutil.failure_message(len(peerids),
+                                                self.required_shares,
+                                                self.servers_of_happiness,
+                                                happiness)
+            msg = "%s: %s" % (msg, why)
             raise UploadUnhappinessError(msg)
         self.log("but we can still continue with %s shares, we'll be happy "
-                 "with at least %s" % (len(servers_left),
+                 "with at least %s" % (happiness,
                                        self.servers_of_happiness),
                  parent=ln)
 
     def _gather_responses(self, dl):
         d = defer.DeferredList(dl, fireOnOneErrback=True)
-        def _eatNotEnoughSharesError(f):
+        def _eatUploadUnhappinessError(f):
             # all exceptions that occur while talking to a peer are handled
             # in _remove_shareholder. That might raise UploadUnhappinessError,
             # which will cause the DeferredList to errback but which should
@@ -513,7 +522,7 @@ class Encoder(object):
             f.trap(UploadUnhappinessError)
             return None
         for d0 in dl:
-            d0.addErrback(_eatNotEnoughSharesError)
+            d0.addErrback(_eatUploadUnhappinessError)
         return d
 
     def finish_hashing(self):
index 6ca533910b94f004bbdd81e32d1b21b8970f0e67..6e07da7be2e9bc3f092d417e3cc7a16fe48c153c 100644 (file)
@@ -242,6 +242,12 @@ class WriteBucketProxy:
     def abort(self):
         return self._rref.callRemoteOnly("abort")
 
+
+    def get_peerid(self):
+        if self._nodeid:
+            return self._nodeid
+        return None
+
 class WriteBucketProxy_v2(WriteBucketProxy):
     fieldsize = 8
     fieldstruct = ">Q"
index ca57cd91956173713caec40ce52fe63cedc9d552..4beb66f5ce26674803593252741c8d8bcdd0af6a 100644 (file)
@@ -13,6 +13,9 @@ from allmydata import hashtree, uri
 from allmydata.storage.server import si_b2a
 from allmydata.immutable import encode
 from allmydata.util import base32, dictutil, idlib, log, mathutil
+from allmydata.util.happinessutil import servers_of_happiness, \
+                                         shares_by_server, merge_peers, \
+                                         failure_message
 from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
@@ -113,10 +116,9 @@ class PeerTracker:
         d.addCallback(self._got_reply)
         return d
 
-    def query_allocated(self):
-        d = self._storageserver.callRemote("get_buckets",
-                                           self.storage_index)
-        return d
+    def ask_about_existing_shares(self):
+        return self._storageserver.callRemote("get_buckets",
+                                              self.storage_index)
 
     def _got_reply(self, (alreadygot, buckets)):
         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
@@ -132,52 +134,6 @@ class PeerTracker:
         self.buckets.update(b)
         return (alreadygot, set(b.keys()))
 
-def servers_with_unique_shares(existing_shares, used_peers=None):
-    """
-    I accept a dict of shareid -> peerid mappings (and optionally a list
-    of PeerTracker instances) and return a list of servers that have shares.
-    """
-    servers = []
-    existing_shares = existing_shares.copy()
-    if used_peers:
-        peerdict = {}
-        for peer in used_peers:
-            peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
-        for k in peerdict.keys():
-            if existing_shares.has_key(k):
-                # Prevent overcounting; favor the bucket, and not the 
-                # prexisting share.
-                del(existing_shares[k])
-        peers = list(used_peers.copy())
-        # We do this because the preexisting shares list goes by peerid.
-        peers = [x.peerid for x in peers]
-        servers.extend(peers)
-    servers.extend(existing_shares.values())
-    return list(set(servers))
-
-def shares_by_server(existing_shares):
-    """
-    I accept a dict of shareid -> peerid mappings, and return a dict
-    of peerid -> shareid mappings
-    """
-    servers = {}
-    for server in set(existing_shares.values()):
-        servers[server] = set([x for x in existing_shares.keys()
-                               if existing_shares[x] == server])
-    return servers
-
-def should_add_server(existing_shares, server, bucket):
-    """
-    I tell my caller whether the servers_of_happiness number will be
-    increased or decreased if a particular server is added as the peer
-    already holding a particular share. I take a dictionary, a peerid,
-    and a bucket as arguments, and return a boolean.
-    """
-    old_size = len(servers_with_unique_shares(existing_shares))
-    new_candidate = existing_shares.copy()
-    new_candidate[bucket] = server
-    new_size = len(servers_with_unique_shares(new_candidate))
-    return old_size < new_size
 
 class Tahoe2PeerSelector:
 
@@ -203,8 +159,8 @@ class Tahoe2PeerSelector:
         @return: (used_peers, already_peers), where used_peers is a set of
                  PeerTracker instances that have agreed to hold some shares
                  for us (the shnum is stashed inside the PeerTracker),
-                 and already_peers is a dict mapping shnum to a peer
-                 which claims to already have the share.
+                 and already_peers is a dict mapping shnum to a set of peers
+                 which claim to already have the share.
         """
 
         if self._status:
@@ -215,25 +171,21 @@ class Tahoe2PeerSelector:
         self.needed_shares = needed_shares
 
         self.homeless_shares = range(total_shares)
-        # self.uncontacted_peers = list() # peers we haven't asked yet
         self.contacted_peers = [] # peers worth asking again
         self.contacted_peers2 = [] # peers that we have asked again
         self._started_second_pass = False
         self.use_peers = set() # PeerTrackers that have shares assigned to them
-        self.preexisting_shares = {} # sharenum -> peerid holding the share
-        # We don't try to allocate shares to these servers, since they've 
-        # said that they're incapable of storing shares of the size that 
-        # we'd want to store. We keep them around because they may have
-        # existing shares for this storage index, which we want to know
-        # about for accurate servers_of_happiness accounting
-        self.readonly_peers = []
-        # These peers have shares -- any shares -- for our SI. We keep track
-        # of these to write an error message with them later.
-        self.peers_with_shares = []
-
-        peers = storage_broker.get_servers_for_index(storage_index)
-        if not peers:
-            raise NoServersError("client gave us zero peers")
+        self.preexisting_shares = {} # shareid => set(peerids) holding shareid
+        # We don't try to allocate shares to these servers, since they've said
+        # that they're incapable of storing shares of the size that we'd want
+        # to store. We keep them around because they may have existing shares
+        # for this storage index, which we want to know about for accurate
+        # servers_of_happiness accounting
+        # (this is eventually a list, but it is initialized later)
+        self.readonly_peers = None
+        # These peers have shares -- any shares -- for our SI. We keep
+        # track of these to write an error message with them later.
+        self.peers_with_shares = set()
 
         # this needed_hashes computation should mirror
         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
@@ -247,6 +199,9 @@ class Tahoe2PeerSelector:
                                              num_share_hashes, EXTENSION_SIZE,
                                              None)
         allocated_size = wbp.get_allocated_size()
+        all_peers = storage_broker.get_servers_for_index(storage_index)
+        if not all_peers:
+            raise NoServersError("client gave us zero peers")
 
         # filter the list of peers according to which ones can accomodate
         # this request. This excludes older peers (which used a 4-byte size
@@ -256,10 +211,9 @@ class Tahoe2PeerSelector:
             (peerid, conn) = peer
             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             return v1["maximum-immutable-share-size"]
-        new_peers = [peer for peer in peers
-                     if _get_maxsize(peer) >= allocated_size]
-        old_peers = list(set(peers).difference(set(new_peers)))
-        peers = new_peers
+        writable_peers = [peer for peer in all_peers
+                          if _get_maxsize(peer) >= allocated_size]
+        readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
 
         # decide upon the renewal/cancel secrets, to include them in the
         # allocate_buckets query.
@@ -271,41 +225,46 @@ class Tahoe2PeerSelector:
         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
                                                      storage_index)
         def _make_trackers(peers):
-           return [ PeerTracker(peerid, conn,
-                                share_size, block_size,
-                                num_segments, num_share_hashes,
-                                storage_index,
-                                bucket_renewal_secret_hash(file_renewal_secret,
-                                                           peerid),
-                                bucket_cancel_secret_hash(file_cancel_secret,
-                                                          peerid))
+           return [PeerTracker(peerid, conn,
+                               share_size, block_size,
+                               num_segments, num_share_hashes,
+                               storage_index,
+                               bucket_renewal_secret_hash(file_renewal_secret,
+                                                          peerid),
+                               bucket_cancel_secret_hash(file_cancel_secret,
+                                                         peerid))
                     for (peerid, conn) in peers]
-        self.uncontacted_peers = _make_trackers(peers)
-        self.readonly_peers = _make_trackers(old_peers)
-        # Talk to the readonly servers to get an idea of what servers
-        # have what shares (if any) for this storage index
-        d = defer.maybeDeferred(self._existing_shares)
-        d.addCallback(lambda ign: self._loop())
-        return d
-
-    def _existing_shares(self):
-        if self.readonly_peers:
-            peer = self.readonly_peers.pop()
+        self.uncontacted_peers = _make_trackers(writable_peers)
+        self.readonly_peers = _make_trackers(readonly_peers)
+        # We now ask peers that can't hold any new shares about existing
+        # shares that they might have for our SI. Once this is done, we
+        # start placing the shares that we haven't already accounted
+        # for.
+        ds = []
+        if self._status and self.readonly_peers:
+            self._status.set_status("Contacting readonly peers to find "
+                                    "any existing shares")
+        for peer in self.readonly_peers:
             assert isinstance(peer, PeerTracker)
-            d = peer.query_allocated()
+            d = peer.ask_about_existing_shares()
             d.addBoth(self._handle_existing_response, peer.peerid)
+            ds.append(d)
             self.num_peers_contacted += 1
             self.query_count += 1
-            log.msg("asking peer %s for any existing shares for upload id %s"
+            log.msg("asking peer %s for any existing shares for "
+                    "upload id %s"
                     % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
                     level=log.NOISY, parent=self._log_parent)
-            if self._status:
-                self._status.set_status("Contacting Peer %s to find "
-                                        "any existing shares"
-                                        % idlib.shortnodeid_b2a(peer.peerid))
-            return d
+        dl = defer.DeferredList(ds)
+        dl.addCallback(lambda ign: self._loop())
+        return dl
+
 
     def _handle_existing_response(self, res, peer):
+        """
+        I handle responses to the queries sent by
+        Tahoe2PeerSelector._existing_shares.
+        """
         if isinstance(res, failure.Failure):
             log.msg("%s got error during existing shares check: %s"
                     % (idlib.shortnodeid_b2a(peer), res),
@@ -315,18 +274,17 @@ class Tahoe2PeerSelector:
         else:
             buckets = res
             if buckets:
-                self.peers_with_shares.append(peer)
+                self.peers_with_shares.add(peer)
             log.msg("response from peer %s: alreadygot=%s"
                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
                     level=log.NOISY, parent=self._log_parent)
             for bucket in buckets:
-                if should_add_server(self.preexisting_shares, peer, bucket):
-                    self.preexisting_shares[bucket] = peer
-                    if self.homeless_shares and bucket in self.homeless_shares:
-                        self.homeless_shares.remove(bucket)
+                self.preexisting_shares.setdefault(bucket, set()).add(peer)
+                if self.homeless_shares and bucket in self.homeless_shares:
+                    self.homeless_shares.remove(bucket)
             self.full_count += 1
             self.bad_query_count += 1
-        return self._existing_shares()
+
 
     def _get_progress_message(self):
         if not self.homeless_shares:
@@ -350,16 +308,20 @@ class Tahoe2PeerSelector:
 
     def _loop(self):
         if not self.homeless_shares:
-            effective_happiness = servers_with_unique_shares(
-                                                   self.preexisting_shares,
-                                                   self.use_peers)
-            if self.servers_of_happiness <= len(effective_happiness):
+            merged = merge_peers(self.preexisting_shares, self.use_peers)
+            effective_happiness = servers_of_happiness(merged)
+            if self.servers_of_happiness <= effective_happiness:
                 msg = ("peer selection successful for %s: %s" % (self,
                             self._get_progress_message()))
                 log.msg(msg, parent=self._log_parent)
                 return (self.use_peers, self.preexisting_shares)
             else:
-                delta = self.servers_of_happiness - len(effective_happiness)
+                # We're not okay right now, but maybe we can fix it by
+                # redistributing some shares. In cases where one or two
+                # servers has, before the upload, all or most of the
+                # shares for a given SI, this can work by allowing _loop
+                # a chance to spread those out over the other peers, 
+                delta = self.servers_of_happiness - effective_happiness
                 shares = shares_by_server(self.preexisting_shares)
                 # Each server in shares maps to a set of shares stored on it.
                 # Since we want to keep at least one share on each server 
@@ -371,60 +333,32 @@ class Tahoe2PeerSelector:
                                         in shares.items()])
                 if delta <= len(self.uncontacted_peers) and \
                    shares_to_spread >= delta:
-                    # Loop through the allocated shares, removing 
-                    # one from each server that has more than one and putting
-                    # it back into self.homeless_shares until we've done
-                    # this delta times.
                     items = shares.items()
                     while len(self.homeless_shares) < delta:
-                        servernum, sharelist = items.pop()
+                        # Loop through the allocated shares, removing
+                        # one from each server that has more than one
+                        # and putting it back into self.homeless_shares
+                        # until we've done this delta times.
+                        server, sharelist = items.pop()
                         if len(sharelist) > 1:
                             share = sharelist.pop()
                             self.homeless_shares.append(share)
-                            del(self.preexisting_shares[share])
-                            items.append((servernum, sharelist))
+                            self.preexisting_shares[share].remove(server)
+                            if not self.preexisting_shares[share]:
+                                del self.preexisting_shares[share]
+                            items.append((server, sharelist))
                     return self._loop()
                 else:
-                    peer_count = len(list(set(self.peers_with_shares)))
+                    # Redistribution won't help us; fail.
+                    peer_count = len(self.peers_with_shares)
                     # If peer_count < needed_shares, then the second error
                     # message is nonsensical, so we use this one.
-                    if peer_count < self.needed_shares:
-                        msg = ("shares could only be placed or found on %d "
-                               "server(s). "
-                               "We were asked to place shares on at least %d "
-                               "server(s) such that any %d of them have "
-                               "enough shares to recover the file." %
-                               (peer_count,
-                                self.servers_of_happiness,
-                                self.needed_shares))
-                    # Otherwise, if we've placed on at least needed_shares
-                    # peers, but there isn't an x-happy subset of those peers
-                    # for x < needed_shares, we use this error message.
-                    elif len(effective_happiness) < self.needed_shares:
-                        msg = ("shares could be placed or found on %d "
-                               "server(s), but they are not spread out evenly "
-                               "enough to ensure that any %d of these servers "
-                               "would have enough shares to recover the file. "
-                               "We were asked to place "
-                               "shares on at least %d servers such that any "
-                               "%d of them have enough shares to recover the "
-                               "file." %
-                               (peer_count,
-                                self.needed_shares,
-                                self.servers_of_happiness,
-                                self.needed_shares))
-                    # Otherwise, if there is an x-happy subset of peers where
-                    # x >= needed_shares, but x < shares_of_happiness, then 
-                    # we use this message.
-                    else:
-                        msg = ("shares could only be placed on %d server(s) "
-                               "such that any %d of them have enough shares "
-                               "to recover the file, but we were asked to use "
-                               "at least %d such servers." %
-                                               (len(effective_happiness),
-                                                self.needed_shares,
-                                                self.servers_of_happiness))
-                    raise UploadUnhappinessError(msg)
+                    msg = failure_message(peer_count,
+                                          self.needed_shares,
+                                          self.servers_of_happiness,
+                                          effective_happiness)
+                    raise UploadUnhappinessError("%s (%s)" % (msg,
+                                                 self._get_progress_message()))
 
         if self.uncontacted_peers:
             peer = self.uncontacted_peers.pop(0)
@@ -473,11 +407,15 @@ class Tahoe2PeerSelector:
         else:
             # no more peers. If we haven't placed enough shares, we fail.
             placed_shares = self.total_shares - len(self.homeless_shares)
-            effective_happiness = servers_with_unique_shares(
-                                                   self.preexisting_shares,
-                                                   self.use_peers)
-            if len(effective_happiness) < self.servers_of_happiness:
-                msg = ("peer selection failed for %s: %s" % (self,
+            merged = merge_peers(self.preexisting_shares, self.use_peers)
+            effective_happiness = servers_of_happiness(merged)
+            if effective_happiness < self.servers_of_happiness:
+                msg = failure_message(len(self.peers_with_shares),
+                                      self.needed_shares,
+                                      self.servers_of_happiness,
+                                      effective_happiness)
+                msg = ("peer selection failed for %s: %s (%s)" % (self,
+                                msg,
                                 self._get_progress_message()))
                 if self.last_failure_msg:
                     msg += " (%s)" % (self.last_failure_msg,)
@@ -519,11 +457,12 @@ class Tahoe2PeerSelector:
                     level=log.NOISY, parent=self._log_parent)
             progress = False
             for s in alreadygot:
-                if should_add_server(self.preexisting_shares,
-                                     peer.peerid, s):
-                    self.preexisting_shares[s] = peer.peerid
-                    if s in self.homeless_shares:
-                        self.homeless_shares.remove(s)
+                self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
+                if s in self.homeless_shares:
+                    self.homeless_shares.remove(s)
+                    progress = True
+                elif s in shares_to_ask:
+                    progress = True
 
             # the PeerTracker will remember which shares were allocated on
             # that peer. We just have to remember to use them.
@@ -532,14 +471,16 @@ class Tahoe2PeerSelector:
                 progress = True
 
             if allocated or alreadygot:
-                self.peers_with_shares.append(peer.peerid)
+                self.peers_with_shares.add(peer.peerid)
 
             not_yet_present = set(shares_to_ask) - set(alreadygot)
             still_homeless = not_yet_present - set(allocated)
 
             if progress:
-                # they accepted or already had at least one share, so
-                # progress has been made
+                # They accepted at least one of the shares that we asked
+                # them to accept, or they had a share that we didn't ask
+                # them to accept but that we hadn't placed yet, so this
+                # was a productive query
                 self.good_query_count += 1
             else:
                 self.bad_query_count += 1
@@ -938,8 +879,8 @@ class CHKUploader:
     def set_shareholders(self, (used_peers, already_peers), encoder):
         """
         @param used_peers: a sequence of PeerTracker objects
-        @paran already_peers: a dict mapping sharenum to a peerid that
-                              claims to already have this share
+        @paran already_peers: a dict mapping sharenum to a set of peerids
+                              that claim to already have this share
         """
         self.log("_send_shares, used_peers is %s" % (used_peers,))
         # record already-present shares in self._results
@@ -954,7 +895,7 @@ class CHKUploader:
             buckets.update(peer.buckets)
             for shnum in peer.buckets:
                 self._peer_trackers[shnum] = peer
-                servermap[shnum] = peer.peerid
+                servermap.setdefault(shnum, set()).add(peer.peerid)
         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
         encoder.set_shareholders(buckets, servermap)
 
index c2231e42ea2df2b5b5736f746525a0e06ff27bf6..f325bb1f27e72712d42fc70ce94fb3dcab5d2168 100644 (file)
@@ -1345,7 +1345,8 @@ class IEncoder(Interface):
         must be a dictionary that maps share number (an integer ranging from
         0 to n-1) to an instance that provides IStorageBucketWriter.
         'servermap' is a dictionary that maps share number (as defined above)
-        to a peerid. This must be performed before start() can be called."""
+        to a set of peerids. This must be performed before start() can be
+        called."""
 
     def start():
         """Begin the encode/upload process. This involves reading encrypted
diff --git a/src/allmydata/util/happinessutil.py b/src/allmydata/util/happinessutil.py
new file mode 100644 (file)
index 0000000..4c71129
--- /dev/null
@@ -0,0 +1,299 @@
+"""
+I contain utilities useful for calculating servers_of_happiness, and for
+reporting it in messages
+"""
+
+def failure_message(peer_count, k, happy, effective_happy):
+    # If peer_count < needed_shares, this error message makes more
+    # sense than any of the others, so use it.
+    if peer_count < k:
+        msg = ("shares could be placed or found on only %d "
+               "server(s). "
+               "We were asked to place shares on at least %d "
+               "server(s) such that any %d of them have "
+               "enough shares to recover the file." %
+                (peer_count, happy, k))
+    # Otherwise, if we've placed on at least needed_shares
+    # peers, but there isn't an x-happy subset of those peers
+    # for x >= needed_shares, we use this error message.
+    elif effective_happy < k:
+        msg = ("shares could be placed or found on %d "
+               "server(s), but they are not spread out evenly "
+               "enough to ensure that any %d of these servers "
+               "would have enough shares to recover the file. "
+               "We were asked to place "
+               "shares on at least %d servers such that any "
+               "%d of them have enough shares to recover the "
+               "file." %
+                (peer_count, k, happy, k))
+    # Otherwise, if there is an x-happy subset of peers where
+    # x >= needed_shares, but x < servers_of_happiness, then 
+    # we use this message.
+    else:
+        msg = ("shares could be placed on only %d server(s) "
+               "such that any %d of them have enough shares "
+               "to recover the file, but we were asked to "
+               "place shares on at least %d such servers." %
+                (effective_happy, k, happy))
+    return msg
+
+
+def shares_by_server(servermap):
+    """
+    I accept a dict of shareid -> set(peerid) mappings, and return a
+    dict of peerid -> set(shareid) mappings. My argument is a dictionary
+    with sets of peers, indexed by shares, and I transform that into a
+    dictionary of sets of shares, indexed by peerids.
+    """
+    ret = {}
+    for shareid, peers in servermap.iteritems():
+        assert isinstance(peers, set)
+        for peerid in peers:
+            ret.setdefault(peerid, set()).add(shareid)
+    return ret
+
+def merge_peers(servermap, used_peers=None):
+    """
+    I accept a dict of shareid -> set(peerid) mappings, and optionally a
+    set of PeerTrackers. If no set of PeerTrackers is provided, I return
+    my first argument unmodified. Otherwise, I update a copy of my first
+    argument to include the shareid -> peerid mappings implied in the
+    set of PeerTrackers, returning the resulting dict.
+    """
+    if not used_peers:
+        return servermap
+
+    assert(isinstance(servermap, dict))
+    assert(isinstance(used_peers, set))
+
+    # Since we mutate servermap, and are called outside of a 
+    # context where it is okay to do that, make a copy of servermap and
+    # work with it.
+    servermap = servermap.copy()
+    for peer in used_peers:
+        for shnum in peer.buckets:
+            servermap.setdefault(shnum, set()).add(peer.peerid)
+    return servermap
+
+def servers_of_happiness(sharemap):
+    """
+    I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
+    return the 'servers_of_happiness' number that sharemap results in.
+
+    To calculate the 'servers_of_happiness' number for the sharemap, I
+    construct a bipartite graph with servers in one partition of vertices
+    and shares in the other, and with an edge between a server s and a share t
+    if s is to store t. I then compute the size of a maximum matching in
+    the resulting graph; this is then returned as the 'servers_of_happiness'
+    for my arguments.
+
+    For example, consider the following layout:
+
+      server 1: shares 1, 2, 3, 4
+      server 2: share 6
+      server 3: share 3
+      server 4: share 4
+      server 5: share 2
+
+    From this, we can construct the following graph:
+
+      L = {server 1, server 2, server 3, server 4, server 5}
+      R = {share 1, share 2, share 3, share 4, share 6}
+      V = L U R
+      E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
+           (server 1, share 4), (server 2, share 6), (server 3, share 3),
+           (server 4, share 4), (server 5, share 2)}
+      G = (V, E)
+
+    Note that G is bipartite since every edge in e has one endpoint in L
+    and one endpoint in R.
+
+    A matching in a graph G is a subset M of E such that, for any vertex
+    v in V, v is incident to at most one edge of M. A maximum matching
+    in G is a matching that is no smaller than any other matching. For
+    this graph, a matching of cardinality 5 is:
+
+      M = {(server 1, share 1), (server 2, share 6),
+           (server 3, share 3), (server 4, share 4),
+           (server 5, share 2)}
+
+    Since G is bipartite, and since |L| = 5, we cannot have an M' such
+    that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
+    as long as k <= 5, we can see that the layout above has
+    servers_of_happiness = 5, which matches the results here.
+    """
+    if sharemap == {}:
+        return 0
+    sharemap = shares_by_server(sharemap)
+    graph = flow_network_for(sharemap)
+    # This is an implementation of the Ford-Fulkerson method for finding
+    # a maximum flow in a flow network applied to a bipartite graph. 
+    # Specifically, it is the Edmonds-Karp algorithm, since it uses a 
+    # BFS to find the shortest augmenting path at each iteration, if one
+    # exists. 
+    # 
+    # The implementation here is an adapation of an algorithm described in 
+    # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. 
+    dim = len(graph)
+    flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
+    residual_graph, residual_function = residual_network(graph, flow_function)
+    while augmenting_path_for(residual_graph):
+        path = augmenting_path_for(residual_graph)
+        # Delta is the largest amount that we can increase flow across
+        # all of the edges in path. Because of the way that the residual
+        # function is constructed, f[u][v] for a particular edge (u, v)
+        # is the amount of unused capacity on that edge. Taking the
+        # minimum of a list of those values for each edge in the
+        # augmenting path gives us our delta.
+        delta = min(map(lambda (u, v): residual_function[u][v], path))
+        for (u, v) in path:
+            flow_function[u][v] += delta
+            flow_function[v][u] -= delta
+        residual_graph, residual_function = residual_network(graph,
+                                                             flow_function)
+    num_servers = len(sharemap)
+    # The value of a flow is the total flow out of the source vertex
+    # (vertex 0, in our graph). We could just as well sum across all of
+    # f[0], but we know that vertex 0 only has edges to the servers in
+    # our graph, so we can stop after summing flow across those. The
+    # value of a flow computed in this way is the size of a maximum
+    # matching on the bipartite graph described above.
+    return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
+
+def flow_network_for(sharemap):
+    """
+    I take my argument, a dict of peerid -> set(shareid) mappings, and
+    turn it into a flow network suitable for use with Edmonds-Karp. I
+    then return the adjacency list representation of that network.
+
+    Specifically, I build G = (V, E), where:
+      V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
+      E = {(s, peerid) for each peerid}
+          U {(peerid, shareid) if peerid is to store shareid }
+          U {(shareid, t) for each shareid}
+
+    s and t will be source and sink nodes when my caller starts treating
+    the graph I return like a flow network. Without s and t, the
+    returned graph is bipartite.
+    """
+    # Servers don't have integral identifiers, and we can't make any
+    # assumptions about the way shares are indexed -- it's possible that
+    # there are missing shares, for example. So before making a graph,
+    # we re-index so that all of our vertices have integral indices, and
+    # that there aren't any holes. We start indexing at 1, so that we
+    # can add a source node at index 0.
+    sharemap, num_shares = reindex(sharemap, base_index=1)
+    num_servers = len(sharemap)
+    graph = [] # index -> [index], an adjacency list
+    # Add an entry at the top (index 0) that has an edge to every server
+    # in sharemap 
+    graph.append(sharemap.keys())
+    # For each server, add an entry that has an edge to every share that it
+    # contains (or will contain).
+    for k in sharemap:
+        graph.append(sharemap[k])
+    # For each share, add an entry that has an edge to the sink.
+    sink_num = num_servers + num_shares + 1
+    for i in xrange(num_shares):
+        graph.append([sink_num])
+    # Add an empty entry for the sink, which has no outbound edges.
+    graph.append([])
+    return graph
+
+def reindex(sharemap, base_index):
+    """
+    Given sharemap, I map peerids and shareids to integers that don't
+    conflict with each other, so they're useful as indices in a graph. I
+    return a sharemap that is reindexed appropriately, and also the
+    number of distinct shares in the resulting sharemap as a convenience
+    for my caller. base_index tells me where to start indexing.
+    """
+    shares  = {} # shareid  -> vertex index
+    num = base_index
+    ret = {} # peerid -> [shareid], a reindexed sharemap.
+    # Number the servers first
+    for k in sharemap:
+        ret[num] = sharemap[k]
+        num += 1
+    # Number the shares
+    for k in ret:
+        for shnum in ret[k]:
+            if not shares.has_key(shnum):
+                shares[shnum] = num
+                num += 1
+        ret[k] = map(lambda x: shares[x], ret[k])
+    return (ret, len(shares))
+
+def residual_network(graph, f):
+    """
+    I return the residual network and residual capacity function of the
+    flow network represented by my graph and f arguments. graph is a
+    flow network in adjacency-list form, and f is a flow in graph.
+    """
+    new_graph = [[] for i in xrange(len(graph))]
+    cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
+    for i in xrange(len(graph)):
+        for v in graph[i]:
+            if f[i][v] == 1:
+                # We add an edge (v, i) with cf[v,i] = 1. This means
+                # that we can remove 1 unit of flow from the edge (i, v) 
+                new_graph[v].append(i)
+                cf[v][i] = 1
+                cf[i][v] = -1
+            else:
+                # We add the edge (i, v), since we're not using it right
+                # now.
+                new_graph[i].append(v)
+                cf[i][v] = 1
+                cf[v][i] = -1
+    return (new_graph, cf)
+
+def augmenting_path_for(graph):
+    """
+    I return an augmenting path, if there is one, from the source node
+    to the sink node in the flow network represented by my graph argument.
+    If there is no augmenting path, I return False. I assume that the
+    source node is at index 0 of graph, and the sink node is at the last
+    index. I also assume that graph is a flow network in adjacency list
+    form.
+    """
+    bfs_tree = bfs(graph, 0)
+    if bfs_tree[len(graph) - 1]:
+        n = len(graph) - 1
+        path = [] # [(u, v)], where u and v are vertices in the graph
+        while n != 0:
+            path.insert(0, (bfs_tree[n], n))
+            n = bfs_tree[n]
+        return path
+    return False
+
+def bfs(graph, s):
+    """
+    Perform a BFS on graph starting at s, where graph is a graph in
+    adjacency list form, and s is a node in graph. I return the
+    predecessor table that the BFS generates.
+    """
+    # This is an adaptation of the BFS described in "Introduction to
+    # Algorithms", Cormen et al, 2nd ed., p. 532.
+    # WHITE vertices are those that we haven't seen or explored yet.
+    WHITE = 0
+    # GRAY vertices are those we have seen, but haven't explored yet
+    GRAY  = 1
+    # BLACK vertices are those we have seen and explored
+    BLACK = 2
+    color        = [WHITE for i in xrange(len(graph))]
+    predecessor  = [None for i in xrange(len(graph))]
+    distance     = [-1 for i in xrange(len(graph))]
+    queue = [s] # vertices that we haven't explored yet.
+    color[s] = GRAY
+    distance[s] = 0
+    while queue:
+        n = queue.pop(0)
+        for v in graph[n]:
+            if color[v] == WHITE:
+                color[v] = GRAY
+                distance[v] = distance[n] + 1
+                predecessor[v] = n
+                queue.append(v)
+        color[n] = BLACK
+    return predecessor