from allmydata.storage.server import si_b2a
from allmydata.immutable import encode
from allmydata.util import base32, dictutil, idlib, log, mathutil
+from allmydata.util.happinessutil import servers_of_happiness, \
+ shares_by_server, merge_peers, \
+ failure_message
from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
d.addCallback(self._got_reply)
return d
- def query_allocated(self):
- d = self._storageserver.callRemote("get_buckets",
- self.storage_index)
- return d
+ def ask_about_existing_shares(self):
+ return self._storageserver.callRemote("get_buckets",
+ self.storage_index)
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
self.buckets.update(b)
return (alreadygot, set(b.keys()))
-def servers_with_unique_shares(existing_shares, used_peers=None):
- """
- I accept a dict of shareid -> peerid mappings (and optionally a list
- of PeerTracker instances) and return a list of servers that have shares.
- """
- servers = []
- existing_shares = existing_shares.copy()
- if used_peers:
- peerdict = {}
- for peer in used_peers:
- peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
- for k in peerdict.keys():
- if existing_shares.has_key(k):
- # Prevent overcounting; favor the bucket, and not the
- # prexisting share.
- del(existing_shares[k])
- peers = list(used_peers.copy())
- # We do this because the preexisting shares list goes by peerid.
- peers = [x.peerid for x in peers]
- servers.extend(peers)
- servers.extend(existing_shares.values())
- return list(set(servers))
-
-def shares_by_server(existing_shares):
- """
- I accept a dict of shareid -> peerid mappings, and return a dict
- of peerid -> shareid mappings
- """
- servers = {}
- for server in set(existing_shares.values()):
- servers[server] = set([x for x in existing_shares.keys()
- if existing_shares[x] == server])
- return servers
-
-def should_add_server(existing_shares, server, bucket):
- """
- I tell my caller whether the servers_of_happiness number will be
- increased or decreased if a particular server is added as the peer
- already holding a particular share. I take a dictionary, a peerid,
- and a bucket as arguments, and return a boolean.
- """
- old_size = len(servers_with_unique_shares(existing_shares))
- new_candidate = existing_shares.copy()
- new_candidate[bucket] = server
- new_size = len(servers_with_unique_shares(new_candidate))
- return old_size < new_size
class Tahoe2PeerSelector:
@return: (used_peers, already_peers), where used_peers is a set of
PeerTracker instances that have agreed to hold some shares
for us (the shnum is stashed inside the PeerTracker),
- and already_peers is a dict mapping shnum to a peer
- which claims to already have the share.
+ and already_peers is a dict mapping shnum to a set of peers
+ which claim to already have the share.
"""
if self._status:
self.needed_shares = needed_shares
self.homeless_shares = range(total_shares)
- # self.uncontacted_peers = list() # peers we haven't asked yet
self.contacted_peers = [] # peers worth asking again
self.contacted_peers2 = [] # peers that we have asked again
self._started_second_pass = False
self.use_peers = set() # PeerTrackers that have shares assigned to them
- self.preexisting_shares = {} # sharenum -> peerid holding the share
- # We don't try to allocate shares to these servers, since they've
- # said that they're incapable of storing shares of the size that
- # we'd want to store. We keep them around because they may have
- # existing shares for this storage index, which we want to know
- # about for accurate servers_of_happiness accounting
- self.readonly_peers = []
- # These peers have shares -- any shares -- for our SI. We keep track
- # of these to write an error message with them later.
- self.peers_with_shares = []
-
- peers = storage_broker.get_servers_for_index(storage_index)
- if not peers:
- raise NoServersError("client gave us zero peers")
+ self.preexisting_shares = {} # shareid => set(peerids) holding shareid
+ # We don't try to allocate shares to these servers, since they've said
+ # that they're incapable of storing shares of the size that we'd want
+ # to store. We keep them around because they may have existing shares
+ # for this storage index, which we want to know about for accurate
+ # servers_of_happiness accounting
+ # (this is eventually a list, but it is initialized later)
+ self.readonly_peers = None
+ # These peers have shares -- any shares -- for our SI. We keep
+ # track of these to write an error message with them later.
+ self.peers_with_shares = set()
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
num_share_hashes, EXTENSION_SIZE,
None)
allocated_size = wbp.get_allocated_size()
+ all_peers = storage_broker.get_servers_for_index(storage_index)
+ if not all_peers:
+ raise NoServersError("client gave us zero peers")
# filter the list of peers according to which ones can accomodate
# this request. This excludes older peers (which used a 4-byte size
(peerid, conn) = peer
v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
- new_peers = [peer for peer in peers
- if _get_maxsize(peer) >= allocated_size]
- old_peers = list(set(peers).difference(set(new_peers)))
- peers = new_peers
+ writable_peers = [peer for peer in all_peers
+ if _get_maxsize(peer) >= allocated_size]
+ readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
def _make_trackers(peers):
- return [ PeerTracker(peerid, conn,
- share_size, block_size,
- num_segments, num_share_hashes,
- storage_index,
- bucket_renewal_secret_hash(file_renewal_secret,
- peerid),
- bucket_cancel_secret_hash(file_cancel_secret,
- peerid))
+ return [PeerTracker(peerid, conn,
+ share_size, block_size,
+ num_segments, num_share_hashes,
+ storage_index,
+ bucket_renewal_secret_hash(file_renewal_secret,
+ peerid),
+ bucket_cancel_secret_hash(file_cancel_secret,
+ peerid))
for (peerid, conn) in peers]
- self.uncontacted_peers = _make_trackers(peers)
- self.readonly_peers = _make_trackers(old_peers)
- # Talk to the readonly servers to get an idea of what servers
- # have what shares (if any) for this storage index
- d = defer.maybeDeferred(self._existing_shares)
- d.addCallback(lambda ign: self._loop())
- return d
-
- def _existing_shares(self):
- if self.readonly_peers:
- peer = self.readonly_peers.pop()
+ self.uncontacted_peers = _make_trackers(writable_peers)
+ self.readonly_peers = _make_trackers(readonly_peers)
+ # We now ask peers that can't hold any new shares about existing
+ # shares that they might have for our SI. Once this is done, we
+ # start placing the shares that we haven't already accounted
+ # for.
+ ds = []
+ if self._status and self.readonly_peers:
+ self._status.set_status("Contacting readonly peers to find "
+ "any existing shares")
+ for peer in self.readonly_peers:
assert isinstance(peer, PeerTracker)
- d = peer.query_allocated()
+ d = peer.ask_about_existing_shares()
d.addBoth(self._handle_existing_response, peer.peerid)
+ ds.append(d)
self.num_peers_contacted += 1
self.query_count += 1
- log.msg("asking peer %s for any existing shares for upload id %s"
+ log.msg("asking peer %s for any existing shares for "
+ "upload id %s"
% (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
level=log.NOISY, parent=self._log_parent)
- if self._status:
- self._status.set_status("Contacting Peer %s to find "
- "any existing shares"
- % idlib.shortnodeid_b2a(peer.peerid))
- return d
+ dl = defer.DeferredList(ds)
+ dl.addCallback(lambda ign: self._loop())
+ return dl
+
def _handle_existing_response(self, res, peer):
+ """
+ I handle responses to the queries sent by
+ Tahoe2PeerSelector._existing_shares.
+ """
if isinstance(res, failure.Failure):
log.msg("%s got error during existing shares check: %s"
% (idlib.shortnodeid_b2a(peer), res),
else:
buckets = res
if buckets:
- self.peers_with_shares.append(peer)
+ self.peers_with_shares.add(peer)
log.msg("response from peer %s: alreadygot=%s"
% (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
level=log.NOISY, parent=self._log_parent)
for bucket in buckets:
- if should_add_server(self.preexisting_shares, peer, bucket):
- self.preexisting_shares[bucket] = peer
- if self.homeless_shares and bucket in self.homeless_shares:
- self.homeless_shares.remove(bucket)
+ self.preexisting_shares.setdefault(bucket, set()).add(peer)
+ if self.homeless_shares and bucket in self.homeless_shares:
+ self.homeless_shares.remove(bucket)
self.full_count += 1
self.bad_query_count += 1
- return self._existing_shares()
+
def _get_progress_message(self):
if not self.homeless_shares:
def _loop(self):
if not self.homeless_shares:
- effective_happiness = servers_with_unique_shares(
- self.preexisting_shares,
- self.use_peers)
- if self.servers_of_happiness <= len(effective_happiness):
+ merged = merge_peers(self.preexisting_shares, self.use_peers)
+ effective_happiness = servers_of_happiness(merged)
+ if self.servers_of_happiness <= effective_happiness:
msg = ("peer selection successful for %s: %s" % (self,
self._get_progress_message()))
log.msg(msg, parent=self._log_parent)
return (self.use_peers, self.preexisting_shares)
else:
- delta = self.servers_of_happiness - len(effective_happiness)
+ # We're not okay right now, but maybe we can fix it by
+ # redistributing some shares. In cases where one or two
+ # servers has, before the upload, all or most of the
+ # shares for a given SI, this can work by allowing _loop
+ # a chance to spread those out over the other peers,
+ delta = self.servers_of_happiness - effective_happiness
shares = shares_by_server(self.preexisting_shares)
# Each server in shares maps to a set of shares stored on it.
# Since we want to keep at least one share on each server
in shares.items()])
if delta <= len(self.uncontacted_peers) and \
shares_to_spread >= delta:
- # Loop through the allocated shares, removing
- # one from each server that has more than one and putting
- # it back into self.homeless_shares until we've done
- # this delta times.
items = shares.items()
while len(self.homeless_shares) < delta:
- servernum, sharelist = items.pop()
+ # Loop through the allocated shares, removing
+ # one from each server that has more than one
+ # and putting it back into self.homeless_shares
+ # until we've done this delta times.
+ server, sharelist = items.pop()
if len(sharelist) > 1:
share = sharelist.pop()
self.homeless_shares.append(share)
- del(self.preexisting_shares[share])
- items.append((servernum, sharelist))
+ self.preexisting_shares[share].remove(server)
+ if not self.preexisting_shares[share]:
+ del self.preexisting_shares[share]
+ items.append((server, sharelist))
return self._loop()
else:
- peer_count = len(list(set(self.peers_with_shares)))
+ # Redistribution won't help us; fail.
+ peer_count = len(self.peers_with_shares)
# If peer_count < needed_shares, then the second error
# message is nonsensical, so we use this one.
- if peer_count < self.needed_shares:
- msg = ("shares could only be placed or found on %d "
- "server(s). "
- "We were asked to place shares on at least %d "
- "server(s) such that any %d of them have "
- "enough shares to recover the file." %
- (peer_count,
- self.servers_of_happiness,
- self.needed_shares))
- # Otherwise, if we've placed on at least needed_shares
- # peers, but there isn't an x-happy subset of those peers
- # for x < needed_shares, we use this error message.
- elif len(effective_happiness) < self.needed_shares:
- msg = ("shares could be placed or found on %d "
- "server(s), but they are not spread out evenly "
- "enough to ensure that any %d of these servers "
- "would have enough shares to recover the file. "
- "We were asked to place "
- "shares on at least %d servers such that any "
- "%d of them have enough shares to recover the "
- "file." %
- (peer_count,
- self.needed_shares,
- self.servers_of_happiness,
- self.needed_shares))
- # Otherwise, if there is an x-happy subset of peers where
- # x >= needed_shares, but x < shares_of_happiness, then
- # we use this message.
- else:
- msg = ("shares could only be placed on %d server(s) "
- "such that any %d of them have enough shares "
- "to recover the file, but we were asked to use "
- "at least %d such servers." %
- (len(effective_happiness),
- self.needed_shares,
- self.servers_of_happiness))
- raise UploadUnhappinessError(msg)
+ msg = failure_message(peer_count,
+ self.needed_shares,
+ self.servers_of_happiness,
+ effective_happiness)
+ raise UploadUnhappinessError("%s (%s)" % (msg,
+ self._get_progress_message()))
if self.uncontacted_peers:
peer = self.uncontacted_peers.pop(0)
else:
# no more peers. If we haven't placed enough shares, we fail.
placed_shares = self.total_shares - len(self.homeless_shares)
- effective_happiness = servers_with_unique_shares(
- self.preexisting_shares,
- self.use_peers)
- if len(effective_happiness) < self.servers_of_happiness:
- msg = ("peer selection failed for %s: %s" % (self,
+ merged = merge_peers(self.preexisting_shares, self.use_peers)
+ effective_happiness = servers_of_happiness(merged)
+ if effective_happiness < self.servers_of_happiness:
+ msg = failure_message(len(self.peers_with_shares),
+ self.needed_shares,
+ self.servers_of_happiness,
+ effective_happiness)
+ msg = ("peer selection failed for %s: %s (%s)" % (self,
+ msg,
self._get_progress_message()))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
level=log.NOISY, parent=self._log_parent)
progress = False
for s in alreadygot:
- if should_add_server(self.preexisting_shares,
- peer.peerid, s):
- self.preexisting_shares[s] = peer.peerid
- if s in self.homeless_shares:
- self.homeless_shares.remove(s)
+ self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
+ if s in self.homeless_shares:
+ self.homeless_shares.remove(s)
+ progress = True
+ elif s in shares_to_ask:
+ progress = True
# the PeerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.
progress = True
if allocated or alreadygot:
- self.peers_with_shares.append(peer.peerid)
+ self.peers_with_shares.add(peer.peerid)
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
if progress:
- # they accepted or already had at least one share, so
- # progress has been made
+ # They accepted at least one of the shares that we asked
+ # them to accept, or they had a share that we didn't ask
+ # them to accept but that we hadn't placed yet, so this
+ # was a productive query
self.good_query_count += 1
else:
self.bad_query_count += 1
def set_shareholders(self, (used_peers, already_peers), encoder):
"""
@param used_peers: a sequence of PeerTracker objects
- @paran already_peers: a dict mapping sharenum to a peerid that
- claims to already have this share
+ @paran already_peers: a dict mapping sharenum to a set of peerids
+ that claim to already have this share
"""
self.log("_send_shares, used_peers is %s" % (used_peers,))
# record already-present shares in self._results
buckets.update(peer.buckets)
for shnum in peer.buckets:
self._peer_trackers[shnum] = peer
- servermap[shnum] = peer.peerid
+ servermap.setdefault(shnum, set()).add(peer.peerid)
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
encoder.set_shareholders(buckets, servermap)
--- /dev/null
+"""
+I contain utilities useful for calculating servers_of_happiness, and for
+reporting it in messages
+"""
+
+def failure_message(peer_count, k, happy, effective_happy):
+ # If peer_count < needed_shares, this error message makes more
+ # sense than any of the others, so use it.
+ if peer_count < k:
+ msg = ("shares could be placed or found on only %d "
+ "server(s). "
+ "We were asked to place shares on at least %d "
+ "server(s) such that any %d of them have "
+ "enough shares to recover the file." %
+ (peer_count, happy, k))
+ # Otherwise, if we've placed on at least needed_shares
+ # peers, but there isn't an x-happy subset of those peers
+ # for x >= needed_shares, we use this error message.
+ elif effective_happy < k:
+ msg = ("shares could be placed or found on %d "
+ "server(s), but they are not spread out evenly "
+ "enough to ensure that any %d of these servers "
+ "would have enough shares to recover the file. "
+ "We were asked to place "
+ "shares on at least %d servers such that any "
+ "%d of them have enough shares to recover the "
+ "file." %
+ (peer_count, k, happy, k))
+ # Otherwise, if there is an x-happy subset of peers where
+ # x >= needed_shares, but x < servers_of_happiness, then
+ # we use this message.
+ else:
+ msg = ("shares could be placed on only %d server(s) "
+ "such that any %d of them have enough shares "
+ "to recover the file, but we were asked to "
+ "place shares on at least %d such servers." %
+ (effective_happy, k, happy))
+ return msg
+
+
+def shares_by_server(servermap):
+ """
+ I accept a dict of shareid -> set(peerid) mappings, and return a
+ dict of peerid -> set(shareid) mappings. My argument is a dictionary
+ with sets of peers, indexed by shares, and I transform that into a
+ dictionary of sets of shares, indexed by peerids.
+ """
+ ret = {}
+ for shareid, peers in servermap.iteritems():
+ assert isinstance(peers, set)
+ for peerid in peers:
+ ret.setdefault(peerid, set()).add(shareid)
+ return ret
+
+def merge_peers(servermap, used_peers=None):
+ """
+ I accept a dict of shareid -> set(peerid) mappings, and optionally a
+ set of PeerTrackers. If no set of PeerTrackers is provided, I return
+ my first argument unmodified. Otherwise, I update a copy of my first
+ argument to include the shareid -> peerid mappings implied in the
+ set of PeerTrackers, returning the resulting dict.
+ """
+ if not used_peers:
+ return servermap
+
+ assert(isinstance(servermap, dict))
+ assert(isinstance(used_peers, set))
+
+ # Since we mutate servermap, and are called outside of a
+ # context where it is okay to do that, make a copy of servermap and
+ # work with it.
+ servermap = servermap.copy()
+ for peer in used_peers:
+ for shnum in peer.buckets:
+ servermap.setdefault(shnum, set()).add(peer.peerid)
+ return servermap
+
+def servers_of_happiness(sharemap):
+ """
+ I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
+ return the 'servers_of_happiness' number that sharemap results in.
+
+ To calculate the 'servers_of_happiness' number for the sharemap, I
+ construct a bipartite graph with servers in one partition of vertices
+ and shares in the other, and with an edge between a server s and a share t
+ if s is to store t. I then compute the size of a maximum matching in
+ the resulting graph; this is then returned as the 'servers_of_happiness'
+ for my arguments.
+
+ For example, consider the following layout:
+
+ server 1: shares 1, 2, 3, 4
+ server 2: share 6
+ server 3: share 3
+ server 4: share 4
+ server 5: share 2
+
+ From this, we can construct the following graph:
+
+ L = {server 1, server 2, server 3, server 4, server 5}
+ R = {share 1, share 2, share 3, share 4, share 6}
+ V = L U R
+ E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
+ (server 1, share 4), (server 2, share 6), (server 3, share 3),
+ (server 4, share 4), (server 5, share 2)}
+ G = (V, E)
+
+ Note that G is bipartite since every edge in e has one endpoint in L
+ and one endpoint in R.
+
+ A matching in a graph G is a subset M of E such that, for any vertex
+ v in V, v is incident to at most one edge of M. A maximum matching
+ in G is a matching that is no smaller than any other matching. For
+ this graph, a matching of cardinality 5 is:
+
+ M = {(server 1, share 1), (server 2, share 6),
+ (server 3, share 3), (server 4, share 4),
+ (server 5, share 2)}
+
+ Since G is bipartite, and since |L| = 5, we cannot have an M' such
+ that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
+ as long as k <= 5, we can see that the layout above has
+ servers_of_happiness = 5, which matches the results here.
+ """
+ if sharemap == {}:
+ return 0
+ sharemap = shares_by_server(sharemap)
+ graph = flow_network_for(sharemap)
+ # This is an implementation of the Ford-Fulkerson method for finding
+ # a maximum flow in a flow network applied to a bipartite graph.
+ # Specifically, it is the Edmonds-Karp algorithm, since it uses a
+ # BFS to find the shortest augmenting path at each iteration, if one
+ # exists.
+ #
+ # The implementation here is an adapation of an algorithm described in
+ # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
+ dim = len(graph)
+ flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
+ residual_graph, residual_function = residual_network(graph, flow_function)
+ while augmenting_path_for(residual_graph):
+ path = augmenting_path_for(residual_graph)
+ # Delta is the largest amount that we can increase flow across
+ # all of the edges in path. Because of the way that the residual
+ # function is constructed, f[u][v] for a particular edge (u, v)
+ # is the amount of unused capacity on that edge. Taking the
+ # minimum of a list of those values for each edge in the
+ # augmenting path gives us our delta.
+ delta = min(map(lambda (u, v): residual_function[u][v], path))
+ for (u, v) in path:
+ flow_function[u][v] += delta
+ flow_function[v][u] -= delta
+ residual_graph, residual_function = residual_network(graph,
+ flow_function)
+ num_servers = len(sharemap)
+ # The value of a flow is the total flow out of the source vertex
+ # (vertex 0, in our graph). We could just as well sum across all of
+ # f[0], but we know that vertex 0 only has edges to the servers in
+ # our graph, so we can stop after summing flow across those. The
+ # value of a flow computed in this way is the size of a maximum
+ # matching on the bipartite graph described above.
+ return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
+
+def flow_network_for(sharemap):
+ """
+ I take my argument, a dict of peerid -> set(shareid) mappings, and
+ turn it into a flow network suitable for use with Edmonds-Karp. I
+ then return the adjacency list representation of that network.
+
+ Specifically, I build G = (V, E), where:
+ V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
+ E = {(s, peerid) for each peerid}
+ U {(peerid, shareid) if peerid is to store shareid }
+ U {(shareid, t) for each shareid}
+
+ s and t will be source and sink nodes when my caller starts treating
+ the graph I return like a flow network. Without s and t, the
+ returned graph is bipartite.
+ """
+ # Servers don't have integral identifiers, and we can't make any
+ # assumptions about the way shares are indexed -- it's possible that
+ # there are missing shares, for example. So before making a graph,
+ # we re-index so that all of our vertices have integral indices, and
+ # that there aren't any holes. We start indexing at 1, so that we
+ # can add a source node at index 0.
+ sharemap, num_shares = reindex(sharemap, base_index=1)
+ num_servers = len(sharemap)
+ graph = [] # index -> [index], an adjacency list
+ # Add an entry at the top (index 0) that has an edge to every server
+ # in sharemap
+ graph.append(sharemap.keys())
+ # For each server, add an entry that has an edge to every share that it
+ # contains (or will contain).
+ for k in sharemap:
+ graph.append(sharemap[k])
+ # For each share, add an entry that has an edge to the sink.
+ sink_num = num_servers + num_shares + 1
+ for i in xrange(num_shares):
+ graph.append([sink_num])
+ # Add an empty entry for the sink, which has no outbound edges.
+ graph.append([])
+ return graph
+
+def reindex(sharemap, base_index):
+ """
+ Given sharemap, I map peerids and shareids to integers that don't
+ conflict with each other, so they're useful as indices in a graph. I
+ return a sharemap that is reindexed appropriately, and also the
+ number of distinct shares in the resulting sharemap as a convenience
+ for my caller. base_index tells me where to start indexing.
+ """
+ shares = {} # shareid -> vertex index
+ num = base_index
+ ret = {} # peerid -> [shareid], a reindexed sharemap.
+ # Number the servers first
+ for k in sharemap:
+ ret[num] = sharemap[k]
+ num += 1
+ # Number the shares
+ for k in ret:
+ for shnum in ret[k]:
+ if not shares.has_key(shnum):
+ shares[shnum] = num
+ num += 1
+ ret[k] = map(lambda x: shares[x], ret[k])
+ return (ret, len(shares))
+
+def residual_network(graph, f):
+ """
+ I return the residual network and residual capacity function of the
+ flow network represented by my graph and f arguments. graph is a
+ flow network in adjacency-list form, and f is a flow in graph.
+ """
+ new_graph = [[] for i in xrange(len(graph))]
+ cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
+ for i in xrange(len(graph)):
+ for v in graph[i]:
+ if f[i][v] == 1:
+ # We add an edge (v, i) with cf[v,i] = 1. This means
+ # that we can remove 1 unit of flow from the edge (i, v)
+ new_graph[v].append(i)
+ cf[v][i] = 1
+ cf[i][v] = -1
+ else:
+ # We add the edge (i, v), since we're not using it right
+ # now.
+ new_graph[i].append(v)
+ cf[i][v] = 1
+ cf[v][i] = -1
+ return (new_graph, cf)
+
+def augmenting_path_for(graph):
+ """
+ I return an augmenting path, if there is one, from the source node
+ to the sink node in the flow network represented by my graph argument.
+ If there is no augmenting path, I return False. I assume that the
+ source node is at index 0 of graph, and the sink node is at the last
+ index. I also assume that graph is a flow network in adjacency list
+ form.
+ """
+ bfs_tree = bfs(graph, 0)
+ if bfs_tree[len(graph) - 1]:
+ n = len(graph) - 1
+ path = [] # [(u, v)], where u and v are vertices in the graph
+ while n != 0:
+ path.insert(0, (bfs_tree[n], n))
+ n = bfs_tree[n]
+ return path
+ return False
+
+def bfs(graph, s):
+ """
+ Perform a BFS on graph starting at s, where graph is a graph in
+ adjacency list form, and s is a node in graph. I return the
+ predecessor table that the BFS generates.
+ """
+ # This is an adaptation of the BFS described in "Introduction to
+ # Algorithms", Cormen et al, 2nd ed., p. 532.
+ # WHITE vertices are those that we haven't seen or explored yet.
+ WHITE = 0
+ # GRAY vertices are those we have seen, but haven't explored yet
+ GRAY = 1
+ # BLACK vertices are those we have seen and explored
+ BLACK = 2
+ color = [WHITE for i in xrange(len(graph))]
+ predecessor = [None for i in xrange(len(graph))]
+ distance = [-1 for i in xrange(len(graph))]
+ queue = [s] # vertices that we haven't explored yet.
+ color[s] = GRAY
+ distance[s] = 0
+ while queue:
+ n = queue.pop(0)
+ for v in graph[n]:
+ if color[v] == WHITE:
+ color[v] = GRAY
+ distance[v] = distance[n] + 1
+ predecessor[v] = n
+ queue.append(v)
+ color[n] = BLACK
+ return predecessor