]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/util/happinessutil.py
Fix up the behavior of #778, per reviewers' comments
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / util / happinessutil.py
1 """
2 I contain utilities useful for calculating servers_of_happiness, and for
3 reporting it in messages
4 """
5
6 def failure_message(peer_count, k, happy, effective_happy):
7     # If peer_count < needed_shares, this error message makes more
8     # sense than any of the others, so use it.
9     if peer_count < k:
10         msg = ("shares could be placed or found on only %d "
11                "server(s). "
12                "We were asked to place shares on at least %d "
13                "server(s) such that any %d of them have "
14                "enough shares to recover the file." %
15                 (peer_count, happy, k))
16     # Otherwise, if we've placed on at least needed_shares
17     # peers, but there isn't an x-happy subset of those peers
18     # for x >= needed_shares, we use this error message.
19     elif effective_happy < k:
20         msg = ("shares could be placed or found on %d "
21                "server(s), but they are not spread out evenly "
22                "enough to ensure that any %d of these servers "
23                "would have enough shares to recover the file. "
24                "We were asked to place "
25                "shares on at least %d servers such that any "
26                "%d of them have enough shares to recover the "
27                "file." %
28                 (peer_count, k, happy, k))
29     # Otherwise, if there is an x-happy subset of peers where
30     # x >= needed_shares, but x < servers_of_happiness, then 
31     # we use this message.
32     else:
33         msg = ("shares could be placed on only %d server(s) "
34                "such that any %d of them have enough shares "
35                "to recover the file, but we were asked to "
36                "place shares on at least %d such servers." %
37                 (effective_happy, k, happy))
38     return msg
39
40
41 def shares_by_server(servermap):
42     """
43     I accept a dict of shareid -> set(peerid) mappings, and return a
44     dict of peerid -> set(shareid) mappings. My argument is a dictionary
45     with sets of peers, indexed by shares, and I transform that into a
46     dictionary of sets of shares, indexed by peerids.
47     """
48     ret = {}
49     for shareid, peers in servermap.iteritems():
50         assert isinstance(peers, set)
51         for peerid in peers:
52             ret.setdefault(peerid, set()).add(shareid)
53     return ret
54
55 def merge_peers(servermap, used_peers=None):
56     """
57     I accept a dict of shareid -> set(peerid) mappings, and optionally a
58     set of PeerTrackers. If no set of PeerTrackers is provided, I return
59     my first argument unmodified. Otherwise, I update a copy of my first
60     argument to include the shareid -> peerid mappings implied in the
61     set of PeerTrackers, returning the resulting dict.
62     """
63     if not used_peers:
64         return servermap
65
66     assert(isinstance(servermap, dict))
67     assert(isinstance(used_peers, set))
68
69     # Since we mutate servermap, and are called outside of a 
70     # context where it is okay to do that, make a copy of servermap and
71     # work with it.
72     servermap = servermap.copy()
73     for peer in used_peers:
74         for shnum in peer.buckets:
75             servermap.setdefault(shnum, set()).add(peer.peerid)
76     return servermap
77
78 def servers_of_happiness(sharemap):
79     """
80     I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
81     return the 'servers_of_happiness' number that sharemap results in.
82
83     To calculate the 'servers_of_happiness' number for the sharemap, I
84     construct a bipartite graph with servers in one partition of vertices
85     and shares in the other, and with an edge between a server s and a share t
86     if s is to store t. I then compute the size of a maximum matching in
87     the resulting graph; this is then returned as the 'servers_of_happiness'
88     for my arguments.
89
90     For example, consider the following layout:
91
92       server 1: shares 1, 2, 3, 4
93       server 2: share 6
94       server 3: share 3
95       server 4: share 4
96       server 5: share 2
97
98     From this, we can construct the following graph:
99
100       L = {server 1, server 2, server 3, server 4, server 5}
101       R = {share 1, share 2, share 3, share 4, share 6}
102       V = L U R
103       E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
104            (server 1, share 4), (server 2, share 6), (server 3, share 3),
105            (server 4, share 4), (server 5, share 2)}
106       G = (V, E)
107
108     Note that G is bipartite since every edge in e has one endpoint in L
109     and one endpoint in R.
110
111     A matching in a graph G is a subset M of E such that, for any vertex
112     v in V, v is incident to at most one edge of M. A maximum matching
113     in G is a matching that is no smaller than any other matching. For
114     this graph, a matching of cardinality 5 is:
115
116       M = {(server 1, share 1), (server 2, share 6),
117            (server 3, share 3), (server 4, share 4),
118            (server 5, share 2)}
119
120     Since G is bipartite, and since |L| = 5, we cannot have an M' such
121     that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
122     as long as k <= 5, we can see that the layout above has
123     servers_of_happiness = 5, which matches the results here.
124     """
125     if sharemap == {}:
126         return 0
127     sharemap = shares_by_server(sharemap)
128     graph = flow_network_for(sharemap)
129     # This is an implementation of the Ford-Fulkerson method for finding
130     # a maximum flow in a flow network applied to a bipartite graph. 
131     # Specifically, it is the Edmonds-Karp algorithm, since it uses a 
132     # BFS to find the shortest augmenting path at each iteration, if one
133     # exists. 
134     # 
135     # The implementation here is an adapation of an algorithm described in 
136     # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. 
137     dim = len(graph)
138     flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
139     residual_graph, residual_function = residual_network(graph, flow_function)
140     while augmenting_path_for(residual_graph):
141         path = augmenting_path_for(residual_graph)
142         # Delta is the largest amount that we can increase flow across
143         # all of the edges in path. Because of the way that the residual
144         # function is constructed, f[u][v] for a particular edge (u, v)
145         # is the amount of unused capacity on that edge. Taking the
146         # minimum of a list of those values for each edge in the
147         # augmenting path gives us our delta.
148         delta = min(map(lambda (u, v): residual_function[u][v], path))
149         for (u, v) in path:
150             flow_function[u][v] += delta
151             flow_function[v][u] -= delta
152         residual_graph, residual_function = residual_network(graph,
153                                                              flow_function)
154     num_servers = len(sharemap)
155     # The value of a flow is the total flow out of the source vertex
156     # (vertex 0, in our graph). We could just as well sum across all of
157     # f[0], but we know that vertex 0 only has edges to the servers in
158     # our graph, so we can stop after summing flow across those. The
159     # value of a flow computed in this way is the size of a maximum
160     # matching on the bipartite graph described above.
161     return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
162
163 def flow_network_for(sharemap):
164     """
165     I take my argument, a dict of peerid -> set(shareid) mappings, and
166     turn it into a flow network suitable for use with Edmonds-Karp. I
167     then return the adjacency list representation of that network.
168
169     Specifically, I build G = (V, E), where:
170       V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
171       E = {(s, peerid) for each peerid}
172           U {(peerid, shareid) if peerid is to store shareid }
173           U {(shareid, t) for each shareid}
174
175     s and t will be source and sink nodes when my caller starts treating
176     the graph I return like a flow network. Without s and t, the
177     returned graph is bipartite.
178     """
179     # Servers don't have integral identifiers, and we can't make any
180     # assumptions about the way shares are indexed -- it's possible that
181     # there are missing shares, for example. So before making a graph,
182     # we re-index so that all of our vertices have integral indices, and
183     # that there aren't any holes. We start indexing at 1, so that we
184     # can add a source node at index 0.
185     sharemap, num_shares = reindex(sharemap, base_index=1)
186     num_servers = len(sharemap)
187     graph = [] # index -> [index], an adjacency list
188     # Add an entry at the top (index 0) that has an edge to every server
189     # in sharemap 
190     graph.append(sharemap.keys())
191     # For each server, add an entry that has an edge to every share that it
192     # contains (or will contain).
193     for k in sharemap:
194         graph.append(sharemap[k])
195     # For each share, add an entry that has an edge to the sink.
196     sink_num = num_servers + num_shares + 1
197     for i in xrange(num_shares):
198         graph.append([sink_num])
199     # Add an empty entry for the sink, which has no outbound edges.
200     graph.append([])
201     return graph
202
203 def reindex(sharemap, base_index):
204     """
205     Given sharemap, I map peerids and shareids to integers that don't
206     conflict with each other, so they're useful as indices in a graph. I
207     return a sharemap that is reindexed appropriately, and also the
208     number of distinct shares in the resulting sharemap as a convenience
209     for my caller. base_index tells me where to start indexing.
210     """
211     shares  = {} # shareid  -> vertex index
212     num = base_index
213     ret = {} # peerid -> [shareid], a reindexed sharemap.
214     # Number the servers first
215     for k in sharemap:
216         ret[num] = sharemap[k]
217         num += 1
218     # Number the shares
219     for k in ret:
220         for shnum in ret[k]:
221             if not shares.has_key(shnum):
222                 shares[shnum] = num
223                 num += 1
224         ret[k] = map(lambda x: shares[x], ret[k])
225     return (ret, len(shares))
226
227 def residual_network(graph, f):
228     """
229     I return the residual network and residual capacity function of the
230     flow network represented by my graph and f arguments. graph is a
231     flow network in adjacency-list form, and f is a flow in graph.
232     """
233     new_graph = [[] for i in xrange(len(graph))]
234     cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
235     for i in xrange(len(graph)):
236         for v in graph[i]:
237             if f[i][v] == 1:
238                 # We add an edge (v, i) with cf[v,i] = 1. This means
239                 # that we can remove 1 unit of flow from the edge (i, v) 
240                 new_graph[v].append(i)
241                 cf[v][i] = 1
242                 cf[i][v] = -1
243             else:
244                 # We add the edge (i, v), since we're not using it right
245                 # now.
246                 new_graph[i].append(v)
247                 cf[i][v] = 1
248                 cf[v][i] = -1
249     return (new_graph, cf)
250
251 def augmenting_path_for(graph):
252     """
253     I return an augmenting path, if there is one, from the source node
254     to the sink node in the flow network represented by my graph argument.
255     If there is no augmenting path, I return False. I assume that the
256     source node is at index 0 of graph, and the sink node is at the last
257     index. I also assume that graph is a flow network in adjacency list
258     form.
259     """
260     bfs_tree = bfs(graph, 0)
261     if bfs_tree[len(graph) - 1]:
262         n = len(graph) - 1
263         path = [] # [(u, v)], where u and v are vertices in the graph
264         while n != 0:
265             path.insert(0, (bfs_tree[n], n))
266             n = bfs_tree[n]
267         return path
268     return False
269
270 def bfs(graph, s):
271     """
272     Perform a BFS on graph starting at s, where graph is a graph in
273     adjacency list form, and s is a node in graph. I return the
274     predecessor table that the BFS generates.
275     """
276     # This is an adaptation of the BFS described in "Introduction to
277     # Algorithms", Cormen et al, 2nd ed., p. 532.
278     # WHITE vertices are those that we haven't seen or explored yet.
279     WHITE = 0
280     # GRAY vertices are those we have seen, but haven't explored yet
281     GRAY  = 1
282     # BLACK vertices are those we have seen and explored
283     BLACK = 2
284     color        = [WHITE for i in xrange(len(graph))]
285     predecessor  = [None for i in xrange(len(graph))]
286     distance     = [-1 for i in xrange(len(graph))]
287     queue = [s] # vertices that we haven't explored yet.
288     color[s] = GRAY
289     distance[s] = 0
290     while queue:
291         n = queue.pop(0)
292         for v in graph[n]:
293             if color[v] == WHITE:
294                 color[v] = GRAY
295                 distance[v] = distance[n] + 1
296                 predecessor[v] = n
297                 queue.append(v)
298         color[n] = BLACK
299     return predecessor