]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/checker.py
Mutable repair: use new MODE_REPAIR to query all servers *and* get privkey
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / checker.py
1
2 from allmydata.uri import from_string
3 from allmydata.util import base32, log
4 from allmydata.check_results import CheckAndRepairResults, CheckResults
5
6 from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError
7 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
8 from allmydata.mutable.retrieve import Retrieve # for verifying
9
10 class MutableChecker:
11     SERVERMAP_MODE = MODE_CHECK
12
13     def __init__(self, node, storage_broker, history, monitor):
14         self._node = node
15         self._storage_broker = storage_broker
16         self._history = history
17         self._monitor = monitor
18         self.bad_shares = [] # list of (server,shnum,failure)
19         self._storage_index = self._node.get_storage_index()
20         self.results = CheckResults(from_string(node.get_uri()), self._storage_index)
21         self.need_repair = False
22         self.responded = set() # set of (binary) nodeids
23
24     def check(self, verify=False, add_lease=False):
25         servermap = ServerMap()
26         # Updating the servermap in MODE_CHECK will stand a good chance
27         # of finding all of the shares, and getting a good idea of
28         # recoverability, etc, without verifying.
29         u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
30                              servermap, self.SERVERMAP_MODE,
31                              add_lease=add_lease)
32         if self._history:
33             self._history.notify_mapupdate(u.get_status())
34         d = u.update()
35         d.addCallback(self._got_mapupdate_results)
36         if verify:
37             d.addCallback(self._verify_all_shares)
38         d.addCallback(lambda res: servermap)
39         d.addCallback(self._fill_checker_results, self.results)
40         d.addCallback(lambda res: self.results)
41         return d
42
43     def _got_mapupdate_results(self, servermap):
44         # the file is healthy if there is exactly one recoverable version, it
45         # has at least N distinct shares, and there are no unrecoverable
46         # versions: all existing shares will be for the same version.
47         self._monitor.raise_if_cancelled()
48         self.best_version = None
49         num_recoverable = len(servermap.recoverable_versions())
50         if num_recoverable:
51             self.best_version = servermap.best_recoverable_version()
52
53         # The file is unhealthy and needs to be repaired if:
54         # - There are unrecoverable versions.
55         if servermap.unrecoverable_versions():
56             self.need_repair = True
57         # - There isn't a recoverable version.
58         if num_recoverable != 1:
59             self.need_repair = True
60         # - The best recoverable version is missing some shares.
61         if self.best_version:
62             available_shares = servermap.shares_available()
63             (num_distinct_shares, k, N) = available_shares[self.best_version]
64             if num_distinct_shares < N:
65                 self.need_repair = True
66
67         return servermap
68
69     def _verify_all_shares(self, servermap):
70         # read every byte of each share
71         #
72         # This logic is going to be very nearly the same as the
73         # downloader. I bet we could pass the downloader a flag that
74         # makes it do this, and piggyback onto that instead of
75         # duplicating a bunch of code.
76         #
77         # Like:
78         #  r = Retrieve(blah, blah, blah, verify=True)
79         #  d = r.download()
80         #  (wait, wait, wait, d.callback)
81         #
82         #  Then, when it has finished, we can check the servermap (which
83         #  we provided to Retrieve) to figure out which shares are bad,
84         #  since the Retrieve process will have updated the servermap as
85         #  it went along.
86         #
87         #  By passing the verify=True flag to the constructor, we are
88         #  telling the downloader a few things.
89         #
90         #  1. It needs to download all N shares, not just K shares.
91         #  2. It doesn't need to decrypt or decode the shares, only
92         #     verify them.
93         if not self.best_version:
94             return
95
96         r = Retrieve(self._node, self._storage_broker, servermap,
97                      self.best_version, verify=True)
98         d = r.download()
99         d.addCallback(self._process_bad_shares)
100         return d
101
102
103     def _process_bad_shares(self, bad_shares):
104         if bad_shares:
105             self.need_repair = True
106         self.bad_shares = bad_shares
107
108
109     def _count_shares(self, smap, version):
110         available_shares = smap.shares_available()
111         (num_distinct_shares, k, N) = available_shares[version]
112         counters = {}
113         counters["count-shares-good"] = num_distinct_shares
114         counters["count-shares-needed"] = k
115         counters["count-shares-expected"] = N
116         good_hosts = smap.all_servers_for_version(version)
117         counters["count-good-share-hosts"] = len(good_hosts)
118         vmap = smap.make_versionmap()
119         counters["count-wrong-shares"] = sum([len(shares)
120                                           for verinfo,shares in vmap.items()
121                                           if verinfo != version])
122
123         return counters
124
125     def _fill_checker_results(self, smap, r):
126         self._monitor.raise_if_cancelled()
127         r.set_servermap(smap.copy())
128         healthy = True
129         data = {}
130         report = []
131         summary = []
132         vmap = smap.make_versionmap()
133         recoverable = smap.recoverable_versions()
134         unrecoverable = smap.unrecoverable_versions()
135         data["count-recoverable-versions"] = len(recoverable)
136         data["count-unrecoverable-versions"] = len(unrecoverable)
137
138         if recoverable:
139             report.append("Recoverable Versions: " +
140                           "/".join(["%d*%s" % (len(vmap[v]),
141                                                smap.summarize_version(v))
142                                     for v in recoverable]))
143         if unrecoverable:
144             report.append("Unrecoverable Versions: " +
145                           "/".join(["%d*%s" % (len(vmap[v]),
146                                                smap.summarize_version(v))
147                                     for v in unrecoverable]))
148         if smap.unrecoverable_versions():
149             healthy = False
150             summary.append("some versions are unrecoverable")
151             report.append("Unhealthy: some versions are unrecoverable")
152         if len(recoverable) == 0:
153             healthy = False
154             summary.append("no versions are recoverable")
155             report.append("Unhealthy: no versions are recoverable")
156         if len(recoverable) > 1:
157             healthy = False
158             summary.append("multiple versions are recoverable")
159             report.append("Unhealthy: there are multiple recoverable versions")
160
161         needs_rebalancing = False
162         if recoverable:
163             best_version = smap.best_recoverable_version()
164             report.append("Best Recoverable Version: " +
165                           smap.summarize_version(best_version))
166             counters = self._count_shares(smap, best_version)
167             data.update(counters)
168             s = counters["count-shares-good"]
169             k = counters["count-shares-needed"]
170             N = counters["count-shares-expected"]
171             if s < N:
172                 healthy = False
173                 report.append("Unhealthy: best version has only %d shares "
174                               "(encoding is %d-of-%d)" % (s, k, N))
175                 summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
176             hosts = smap.all_servers_for_version(best_version)
177             needs_rebalancing = bool( len(hosts) < N )
178         elif unrecoverable:
179             healthy = False
180             # find a k and N from somewhere
181             first = list(unrecoverable)[0]
182             # not exactly the best version, but that doesn't matter too much
183             data.update(self._count_shares(smap, first))
184             # leave needs_rebalancing=False: the file being unrecoverable is
185             # the bigger problem
186         else:
187             # couldn't find anything at all
188             data["count-shares-good"] = 0
189             data["count-shares-needed"] = 3 # arbitrary defaults
190             data["count-shares-expected"] = 10
191             data["count-good-share-hosts"] = 0
192             data["count-wrong-shares"] = 0
193
194         if self.bad_shares:
195             data["count-corrupt-shares"] = len(self.bad_shares)
196             data["list-corrupt-shares"] = locators = []
197             report.append("Corrupt Shares:")
198             summary.append("Corrupt Shares:")
199             for (server, shnum, f) in sorted(self.bad_shares):
200                 serverid = server.get_serverid()
201                 locators.append( (serverid, self._storage_index, shnum) )
202                 s = "%s-sh%d" % (server.get_name(), shnum)
203                 if f.check(CorruptShareError):
204                     ft = f.value.reason
205                 else:
206                     ft = str(f)
207                 report.append(" %s: %s" % (s, ft))
208                 summary.append(s)
209                 p = (serverid, self._storage_index, shnum, f)
210                 r.problems.append(p)
211                 msg = ("CorruptShareError during mutable verify, "
212                        "serverid=%(serverid)s, si=%(si)s, shnum=%(shnum)d, "
213                        "where=%(where)s")
214                 log.msg(format=msg, serverid=server.get_name(),
215                         si=base32.b2a(self._storage_index),
216                         shnum=shnum,
217                         where=ft,
218                         level=log.WEIRD, umid="EkK8QA")
219         else:
220             data["count-corrupt-shares"] = 0
221             data["list-corrupt-shares"] = []
222
223         sharemap = {}
224         for verinfo in vmap:
225             for (shnum, server, timestamp) in vmap[verinfo]:
226                 shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
227                 if shareid not in sharemap:
228                     sharemap[shareid] = []
229                 sharemap[shareid].append(server.get_serverid())
230         data["sharemap"] = sharemap
231         data["servers-responding"] = [s.get_serverid() for s in
232                                       list(smap.get_reachable_servers())]
233
234         r.set_healthy(healthy)
235         r.set_recoverable(bool(recoverable))
236         r.set_needs_rebalancing(needs_rebalancing)
237         r.set_data(data)
238         if healthy:
239             r.set_summary("Healthy")
240         else:
241             r.set_summary("Unhealthy: " + " ".join(summary))
242         r.set_report(report)
243
244
245 class MutableCheckAndRepairer(MutableChecker):
246     SERVERMAP_MODE = MODE_WRITE # needed to get the privkey
247
248     def __init__(self, node, storage_broker, history, monitor):
249         MutableChecker.__init__(self, node, storage_broker, history, monitor)
250         self.cr_results = CheckAndRepairResults(self._storage_index)
251         self.cr_results.pre_repair_results = self.results
252         self.need_repair = False
253
254     def check(self, verify=False, add_lease=False):
255         d = MutableChecker.check(self, verify, add_lease)
256         d.addCallback(self._maybe_repair)
257         d.addCallback(lambda res: self.cr_results)
258         return d
259
260     def _maybe_repair(self, res):
261         self._monitor.raise_if_cancelled()
262         if not self.need_repair:
263             self.cr_results.post_repair_results = self.results
264             return
265         if self._node.is_readonly():
266             # ticket #625: we cannot yet repair read-only mutable files
267             self.cr_results.post_repair_results = self.results
268             self.cr_results.repair_attempted = False
269             return
270         self.cr_results.repair_attempted = True
271         d = self._node.repair(self.results, monitor=self._monitor)
272         def _repair_finished(repair_results):
273             self.cr_results.repair_successful = repair_results.get_successful()
274             r = CheckResults(from_string(self._node.get_uri()), self._storage_index)
275             self.cr_results.post_repair_results = r
276             self._fill_checker_results(repair_results.servermap, r)
277             self.cr_results.repair_results = repair_results # TODO?
278         def _repair_error(f):
279             # I'm not sure if I want to pass through a failure or not.
280             self.cr_results.repair_successful = False
281             self.cr_results.repair_failure = f # TODO?
282             #self.cr_results.post_repair_results = ??
283             return f
284         d.addCallbacks(_repair_finished, _repair_error)
285         return d