2 I contain utilities useful for calculating servers_of_happiness, and for
3 reporting it in messages
6 from copy import deepcopy
8 def failure_message(peer_count, k, happy, effective_happy):
9 # If peer_count < needed_shares, this error message makes more
10 # sense than any of the others, so use it.
12 msg = ("shares could be placed or found on only %d "
14 "We were asked to place shares on at least %d "
15 "server(s) such that any %d of them have "
16 "enough shares to recover the file." %
17 (peer_count, happy, k))
18 # Otherwise, if we've placed on at least needed_shares
19 # peers, but there isn't an x-happy subset of those peers
20 # for x >= needed_shares, we use this error message.
21 elif effective_happy < k:
22 msg = ("shares could be placed or found on %d "
23 "server(s), but they are not spread out evenly "
24 "enough to ensure that any %d of these servers "
25 "would have enough shares to recover the file. "
26 "We were asked to place "
27 "shares on at least %d servers such that any "
28 "%d of them have enough shares to recover the "
30 (peer_count, k, happy, k))
31 # Otherwise, if there is an x-happy subset of peers where
32 # x >= needed_shares, but x < servers_of_happiness, then
33 # we use this message.
35 msg = ("shares could be placed on only %d server(s) "
36 "such that any %d of them have enough shares "
37 "to recover the file, but we were asked to "
38 "place shares on at least %d such servers." %
39 (effective_happy, k, happy))
43 def shares_by_server(servermap):
45 I accept a dict of shareid -> set(peerid) mappings, and return a
46 dict of peerid -> set(shareid) mappings. My argument is a dictionary
47 with sets of peers, indexed by shares, and I transform that into a
48 dictionary of sets of shares, indexed by peerids.
51 for shareid, peers in servermap.iteritems():
52 assert isinstance(peers, set)
54 ret.setdefault(peerid, set()).add(shareid)
57 def merge_servers(servermap, upload_trackers=None):
59 I accept a dict of shareid -> set(serverid) mappings, and optionally a
60 set of ServerTrackers. If no set of ServerTrackers is provided, I return
61 my first argument unmodified. Otherwise, I update a copy of my first
62 argument to include the shareid -> serverid mappings implied in the
63 set of ServerTrackers, returning the resulting dict.
65 # Since we mutate servermap, and are called outside of a
66 # context where it is okay to do that, make a copy of servermap and
68 servermap = deepcopy(servermap)
69 if not upload_trackers:
72 assert(isinstance(servermap, dict))
73 assert(isinstance(upload_trackers, set))
75 for tracker in upload_trackers:
76 for shnum in tracker.buckets:
77 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
80 def servers_of_happiness(sharemap):
82 I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
83 return the 'servers_of_happiness' number that sharemap results in.
85 To calculate the 'servers_of_happiness' number for the sharemap, I
86 construct a bipartite graph with servers in one partition of vertices
87 and shares in the other, and with an edge between a server s and a share t
88 if s is to store t. I then compute the size of a maximum matching in
89 the resulting graph; this is then returned as the 'servers_of_happiness'
92 For example, consider the following layout:
94 server 1: shares 1, 2, 3, 4
100 From this, we can construct the following graph:
102 L = {server 1, server 2, server 3, server 4, server 5}
103 R = {share 1, share 2, share 3, share 4, share 6}
105 E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
106 (server 1, share 4), (server 2, share 6), (server 3, share 3),
107 (server 4, share 4), (server 5, share 2)}
110 Note that G is bipartite since every edge in e has one endpoint in L
111 and one endpoint in R.
113 A matching in a graph G is a subset M of E such that, for any vertex
114 v in V, v is incident to at most one edge of M. A maximum matching
115 in G is a matching that is no smaller than any other matching. For
116 this graph, a matching of cardinality 5 is:
118 M = {(server 1, share 1), (server 2, share 6),
119 (server 3, share 3), (server 4, share 4),
122 Since G is bipartite, and since |L| = 5, we cannot have an M' such
123 that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
124 as long as k <= 5, we can see that the layout above has
125 servers_of_happiness = 5, which matches the results here.
129 sharemap = shares_by_server(sharemap)
130 graph = flow_network_for(sharemap)
131 # This is an implementation of the Ford-Fulkerson method for finding
132 # a maximum flow in a flow network applied to a bipartite graph.
133 # Specifically, it is the Edmonds-Karp algorithm, since it uses a
134 # BFS to find the shortest augmenting path at each iteration, if one
137 # The implementation here is an adapation of an algorithm described in
138 # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
140 flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
141 residual_graph, residual_function = residual_network(graph, flow_function)
142 while augmenting_path_for(residual_graph):
143 path = augmenting_path_for(residual_graph)
144 # Delta is the largest amount that we can increase flow across
145 # all of the edges in path. Because of the way that the residual
146 # function is constructed, f[u][v] for a particular edge (u, v)
147 # is the amount of unused capacity on that edge. Taking the
148 # minimum of a list of those values for each edge in the
149 # augmenting path gives us our delta.
150 delta = min(map(lambda (u, v), rf=residual_function: rf[u][v],
153 flow_function[u][v] += delta
154 flow_function[v][u] -= delta
155 residual_graph, residual_function = residual_network(graph,
157 num_servers = len(sharemap)
158 # The value of a flow is the total flow out of the source vertex
159 # (vertex 0, in our graph). We could just as well sum across all of
160 # f[0], but we know that vertex 0 only has edges to the servers in
161 # our graph, so we can stop after summing flow across those. The
162 # value of a flow computed in this way is the size of a maximum
163 # matching on the bipartite graph described above.
164 return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
166 def flow_network_for(sharemap):
168 I take my argument, a dict of peerid -> set(shareid) mappings, and
169 turn it into a flow network suitable for use with Edmonds-Karp. I
170 then return the adjacency list representation of that network.
172 Specifically, I build G = (V, E), where:
173 V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
174 E = {(s, peerid) for each peerid}
175 U {(peerid, shareid) if peerid is to store shareid }
176 U {(shareid, t) for each shareid}
178 s and t will be source and sink nodes when my caller starts treating
179 the graph I return like a flow network. Without s and t, the
180 returned graph is bipartite.
182 # Servers don't have integral identifiers, and we can't make any
183 # assumptions about the way shares are indexed -- it's possible that
184 # there are missing shares, for example. So before making a graph,
185 # we re-index so that all of our vertices have integral indices, and
186 # that there aren't any holes. We start indexing at 1, so that we
187 # can add a source node at index 0.
188 sharemap, num_shares = reindex(sharemap, base_index=1)
189 num_servers = len(sharemap)
190 graph = [] # index -> [index], an adjacency list
191 # Add an entry at the top (index 0) that has an edge to every server
193 graph.append(sharemap.keys())
194 # For each server, add an entry that has an edge to every share that it
195 # contains (or will contain).
197 graph.append(sharemap[k])
198 # For each share, add an entry that has an edge to the sink.
199 sink_num = num_servers + num_shares + 1
200 for i in xrange(num_shares):
201 graph.append([sink_num])
202 # Add an empty entry for the sink, which has no outbound edges.
206 def reindex(sharemap, base_index):
208 Given sharemap, I map peerids and shareids to integers that don't
209 conflict with each other, so they're useful as indices in a graph. I
210 return a sharemap that is reindexed appropriately, and also the
211 number of distinct shares in the resulting sharemap as a convenience
212 for my caller. base_index tells me where to start indexing.
214 shares = {} # shareid -> vertex index
216 ret = {} # peerid -> [shareid], a reindexed sharemap.
217 # Number the servers first
219 ret[num] = sharemap[k]
224 if not shares.has_key(shnum):
227 ret[k] = map(lambda x: shares[x], ret[k])
228 return (ret, len(shares))
230 def residual_network(graph, f):
232 I return the residual network and residual capacity function of the
233 flow network represented by my graph and f arguments. graph is a
234 flow network in adjacency-list form, and f is a flow in graph.
236 new_graph = [[] for i in xrange(len(graph))]
237 cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
238 for i in xrange(len(graph)):
241 # We add an edge (v, i) with cf[v,i] = 1. This means
242 # that we can remove 1 unit of flow from the edge (i, v)
243 new_graph[v].append(i)
247 # We add the edge (i, v), since we're not using it right
249 new_graph[i].append(v)
252 return (new_graph, cf)
254 def augmenting_path_for(graph):
256 I return an augmenting path, if there is one, from the source node
257 to the sink node in the flow network represented by my graph argument.
258 If there is no augmenting path, I return False. I assume that the
259 source node is at index 0 of graph, and the sink node is at the last
260 index. I also assume that graph is a flow network in adjacency list
263 bfs_tree = bfs(graph, 0)
264 if bfs_tree[len(graph) - 1]:
266 path = [] # [(u, v)], where u and v are vertices in the graph
268 path.insert(0, (bfs_tree[n], n))
275 Perform a BFS on graph starting at s, where graph is a graph in
276 adjacency list form, and s is a node in graph. I return the
277 predecessor table that the BFS generates.
279 # This is an adaptation of the BFS described in "Introduction to
280 # Algorithms", Cormen et al, 2nd ed., p. 532.
281 # WHITE vertices are those that we haven't seen or explored yet.
283 # GRAY vertices are those we have seen, but haven't explored yet
285 # BLACK vertices are those we have seen and explored
287 color = [WHITE for i in xrange(len(graph))]
288 predecessor = [None for i in xrange(len(graph))]
289 distance = [-1 for i in xrange(len(graph))]
290 queue = [s] # vertices that we haven't explored yet.
296 if color[v] == WHITE:
298 distance[v] = distance[n] + 1