]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/util/happinessutil.py
33ba5673432ec807ce8f02043cace22801617905
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / util / happinessutil.py
1 """
2 I contain utilities useful for calculating servers_of_happiness, and for
3 reporting it in messages
4 """
5
6 from copy import deepcopy
7
8 def failure_message(peer_count, k, happy, effective_happy):
9     # If peer_count < needed_shares, this error message makes more
10     # sense than any of the others, so use it.
11     if peer_count < k:
12         msg = ("shares could be placed or found on only %d "
13                "server(s). "
14                "We were asked to place shares on at least %d "
15                "server(s) such that any %d of them have "
16                "enough shares to recover the file." %
17                 (peer_count, happy, k))
18     # Otherwise, if we've placed on at least needed_shares
19     # peers, but there isn't an x-happy subset of those peers
20     # for x >= needed_shares, we use this error message.
21     elif effective_happy < k:
22         msg = ("shares could be placed or found on %d "
23                "server(s), but they are not spread out evenly "
24                "enough to ensure that any %d of these servers "
25                "would have enough shares to recover the file. "
26                "We were asked to place "
27                "shares on at least %d servers such that any "
28                "%d of them have enough shares to recover the "
29                "file." %
30                 (peer_count, k, happy, k))
31     # Otherwise, if there is an x-happy subset of peers where
32     # x >= needed_shares, but x < servers_of_happiness, then
33     # we use this message.
34     else:
35         msg = ("shares could be placed on only %d server(s) "
36                "such that any %d of them have enough shares "
37                "to recover the file, but we were asked to "
38                "place shares on at least %d such servers." %
39                 (effective_happy, k, happy))
40     return msg
41
42
43 def shares_by_server(servermap):
44     """
45     I accept a dict of shareid -> set(peerid) mappings, and return a
46     dict of peerid -> set(shareid) mappings. My argument is a dictionary
47     with sets of peers, indexed by shares, and I transform that into a
48     dictionary of sets of shares, indexed by peerids.
49     """
50     ret = {}
51     for shareid, peers in servermap.iteritems():
52         assert isinstance(peers, set)
53         for peerid in peers:
54             ret.setdefault(peerid, set()).add(shareid)
55     return ret
56
57 def merge_servers(servermap, upload_trackers=None):
58     """
59     I accept a dict of shareid -> set(serverid) mappings, and optionally a
60     set of ServerTrackers. If no set of ServerTrackers is provided, I return
61     my first argument unmodified. Otherwise, I update a copy of my first
62     argument to include the shareid -> serverid mappings implied in the
63     set of ServerTrackers, returning the resulting dict.
64     """
65     # Since we mutate servermap, and are called outside of a
66     # context where it is okay to do that, make a copy of servermap and
67     # work with it.
68     servermap = deepcopy(servermap)
69     if not upload_trackers:
70         return servermap
71
72     assert(isinstance(servermap, dict))
73     assert(isinstance(upload_trackers, set))
74
75     for tracker in upload_trackers:
76         for shnum in tracker.buckets:
77             servermap.setdefault(shnum, set()).add(tracker.get_serverid())
78     return servermap
79
80 def servers_of_happiness(sharemap):
81     """
82     I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
83     return the 'servers_of_happiness' number that sharemap results in.
84
85     To calculate the 'servers_of_happiness' number for the sharemap, I
86     construct a bipartite graph with servers in one partition of vertices
87     and shares in the other, and with an edge between a server s and a share t
88     if s is to store t. I then compute the size of a maximum matching in
89     the resulting graph; this is then returned as the 'servers_of_happiness'
90     for my arguments.
91
92     For example, consider the following layout:
93
94       server 1: shares 1, 2, 3, 4
95       server 2: share 6
96       server 3: share 3
97       server 4: share 4
98       server 5: share 2
99
100     From this, we can construct the following graph:
101
102       L = {server 1, server 2, server 3, server 4, server 5}
103       R = {share 1, share 2, share 3, share 4, share 6}
104       V = L U R
105       E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
106            (server 1, share 4), (server 2, share 6), (server 3, share 3),
107            (server 4, share 4), (server 5, share 2)}
108       G = (V, E)
109
110     Note that G is bipartite since every edge in e has one endpoint in L
111     and one endpoint in R.
112
113     A matching in a graph G is a subset M of E such that, for any vertex
114     v in V, v is incident to at most one edge of M. A maximum matching
115     in G is a matching that is no smaller than any other matching. For
116     this graph, a matching of cardinality 5 is:
117
118       M = {(server 1, share 1), (server 2, share 6),
119            (server 3, share 3), (server 4, share 4),
120            (server 5, share 2)}
121
122     Since G is bipartite, and since |L| = 5, we cannot have an M' such
123     that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
124     as long as k <= 5, we can see that the layout above has
125     servers_of_happiness = 5, which matches the results here.
126     """
127     if sharemap == {}:
128         return 0
129     sharemap = shares_by_server(sharemap)
130     graph = flow_network_for(sharemap)
131     # This is an implementation of the Ford-Fulkerson method for finding
132     # a maximum flow in a flow network applied to a bipartite graph.
133     # Specifically, it is the Edmonds-Karp algorithm, since it uses a
134     # BFS to find the shortest augmenting path at each iteration, if one
135     # exists.
136     #
137     # The implementation here is an adapation of an algorithm described in
138     # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
139     dim = len(graph)
140     flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
141     residual_graph, residual_function = residual_network(graph, flow_function)
142     while augmenting_path_for(residual_graph):
143         path = augmenting_path_for(residual_graph)
144         # Delta is the largest amount that we can increase flow across
145         # all of the edges in path. Because of the way that the residual
146         # function is constructed, f[u][v] for a particular edge (u, v)
147         # is the amount of unused capacity on that edge. Taking the
148         # minimum of a list of those values for each edge in the
149         # augmenting path gives us our delta.
150         delta = min(map(lambda (u, v): residual_function[u][v], path))
151         for (u, v) in path:
152             flow_function[u][v] += delta
153             flow_function[v][u] -= delta
154         residual_graph, residual_function = residual_network(graph,
155                                                              flow_function)
156     num_servers = len(sharemap)
157     # The value of a flow is the total flow out of the source vertex
158     # (vertex 0, in our graph). We could just as well sum across all of
159     # f[0], but we know that vertex 0 only has edges to the servers in
160     # our graph, so we can stop after summing flow across those. The
161     # value of a flow computed in this way is the size of a maximum
162     # matching on the bipartite graph described above.
163     return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
164
165 def flow_network_for(sharemap):
166     """
167     I take my argument, a dict of peerid -> set(shareid) mappings, and
168     turn it into a flow network suitable for use with Edmonds-Karp. I
169     then return the adjacency list representation of that network.
170
171     Specifically, I build G = (V, E), where:
172       V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
173       E = {(s, peerid) for each peerid}
174           U {(peerid, shareid) if peerid is to store shareid }
175           U {(shareid, t) for each shareid}
176
177     s and t will be source and sink nodes when my caller starts treating
178     the graph I return like a flow network. Without s and t, the
179     returned graph is bipartite.
180     """
181     # Servers don't have integral identifiers, and we can't make any
182     # assumptions about the way shares are indexed -- it's possible that
183     # there are missing shares, for example. So before making a graph,
184     # we re-index so that all of our vertices have integral indices, and
185     # that there aren't any holes. We start indexing at 1, so that we
186     # can add a source node at index 0.
187     sharemap, num_shares = reindex(sharemap, base_index=1)
188     num_servers = len(sharemap)
189     graph = [] # index -> [index], an adjacency list
190     # Add an entry at the top (index 0) that has an edge to every server
191     # in sharemap
192     graph.append(sharemap.keys())
193     # For each server, add an entry that has an edge to every share that it
194     # contains (or will contain).
195     for k in sharemap:
196         graph.append(sharemap[k])
197     # For each share, add an entry that has an edge to the sink.
198     sink_num = num_servers + num_shares + 1
199     for i in xrange(num_shares):
200         graph.append([sink_num])
201     # Add an empty entry for the sink, which has no outbound edges.
202     graph.append([])
203     return graph
204
205 def reindex(sharemap, base_index):
206     """
207     Given sharemap, I map peerids and shareids to integers that don't
208     conflict with each other, so they're useful as indices in a graph. I
209     return a sharemap that is reindexed appropriately, and also the
210     number of distinct shares in the resulting sharemap as a convenience
211     for my caller. base_index tells me where to start indexing.
212     """
213     shares  = {} # shareid  -> vertex index
214     num = base_index
215     ret = {} # peerid -> [shareid], a reindexed sharemap.
216     # Number the servers first
217     for k in sharemap:
218         ret[num] = sharemap[k]
219         num += 1
220     # Number the shares
221     for k in ret:
222         for shnum in ret[k]:
223             if not shares.has_key(shnum):
224                 shares[shnum] = num
225                 num += 1
226         ret[k] = map(lambda x: shares[x], ret[k])
227     return (ret, len(shares))
228
229 def residual_network(graph, f):
230     """
231     I return the residual network and residual capacity function of the
232     flow network represented by my graph and f arguments. graph is a
233     flow network in adjacency-list form, and f is a flow in graph.
234     """
235     new_graph = [[] for i in xrange(len(graph))]
236     cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
237     for i in xrange(len(graph)):
238         for v in graph[i]:
239             if f[i][v] == 1:
240                 # We add an edge (v, i) with cf[v,i] = 1. This means
241                 # that we can remove 1 unit of flow from the edge (i, v)
242                 new_graph[v].append(i)
243                 cf[v][i] = 1
244                 cf[i][v] = -1
245             else:
246                 # We add the edge (i, v), since we're not using it right
247                 # now.
248                 new_graph[i].append(v)
249                 cf[i][v] = 1
250                 cf[v][i] = -1
251     return (new_graph, cf)
252
253 def augmenting_path_for(graph):
254     """
255     I return an augmenting path, if there is one, from the source node
256     to the sink node in the flow network represented by my graph argument.
257     If there is no augmenting path, I return False. I assume that the
258     source node is at index 0 of graph, and the sink node is at the last
259     index. I also assume that graph is a flow network in adjacency list
260     form.
261     """
262     bfs_tree = bfs(graph, 0)
263     if bfs_tree[len(graph) - 1]:
264         n = len(graph) - 1
265         path = [] # [(u, v)], where u and v are vertices in the graph
266         while n != 0:
267             path.insert(0, (bfs_tree[n], n))
268             n = bfs_tree[n]
269         return path
270     return False
271
272 def bfs(graph, s):
273     """
274     Perform a BFS on graph starting at s, where graph is a graph in
275     adjacency list form, and s is a node in graph. I return the
276     predecessor table that the BFS generates.
277     """
278     # This is an adaptation of the BFS described in "Introduction to
279     # Algorithms", Cormen et al, 2nd ed., p. 532.
280     # WHITE vertices are those that we haven't seen or explored yet.
281     WHITE = 0
282     # GRAY vertices are those we have seen, but haven't explored yet
283     GRAY  = 1
284     # BLACK vertices are those we have seen and explored
285     BLACK = 2
286     color        = [WHITE for i in xrange(len(graph))]
287     predecessor  = [None for i in xrange(len(graph))]
288     distance     = [-1 for i in xrange(len(graph))]
289     queue = [s] # vertices that we haven't explored yet.
290     color[s] = GRAY
291     distance[s] = 0
292     while queue:
293         n = queue.pop(0)
294         for v in graph[n]:
295             if color[v] == WHITE:
296                 color[v] = GRAY
297                 distance[v] = distance[n] + 1
298                 predecessor[v] = n
299                 queue.append(v)
300         color[n] = BLACK
301     return predecessor