]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/checker.py
mutable/checker: refactor to make CheckResults easier to change
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / checker.py
1
2 from allmydata.uri import from_string
3 from allmydata.util import base32, log
4 from allmydata.check_results import CheckAndRepairResults, CheckResults
5
6 from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError
7 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
8 from allmydata.mutable.retrieve import Retrieve # for verifying
9
10 class MutableChecker:
11     SERVERMAP_MODE = MODE_CHECK
12
13     def __init__(self, node, storage_broker, history, monitor):
14         self._node = node
15         self._storage_broker = storage_broker
16         self._history = history
17         self._monitor = monitor
18         self.bad_shares = [] # list of (server,shnum,failure)
19         self._storage_index = self._node.get_storage_index()
20         self.need_repair = False
21         self.responded = set() # set of (binary) nodeids
22
23     def check(self, verify=False, add_lease=False):
24         servermap = ServerMap()
25         # Updating the servermap in MODE_CHECK will stand a good chance
26         # of finding all of the shares, and getting a good idea of
27         # recoverability, etc, without verifying.
28         u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
29                              servermap, self.SERVERMAP_MODE,
30                              add_lease=add_lease)
31         if self._history:
32             self._history.notify_mapupdate(u.get_status())
33         d = u.update()
34         d.addCallback(self._got_mapupdate_results)
35         if verify:
36             d.addCallback(self._verify_all_shares)
37         d.addCallback(lambda res: servermap)
38         d.addCallback(self._make_checker_results)
39         return d
40
41     def _got_mapupdate_results(self, servermap):
42         # the file is healthy if there is exactly one recoverable version, it
43         # has at least N distinct shares, and there are no unrecoverable
44         # versions: all existing shares will be for the same version.
45         self._monitor.raise_if_cancelled()
46         self.best_version = None
47         num_recoverable = len(servermap.recoverable_versions())
48         if num_recoverable:
49             self.best_version = servermap.best_recoverable_version()
50
51         # The file is unhealthy and needs to be repaired if:
52         # - There are unrecoverable versions.
53         if servermap.unrecoverable_versions():
54             self.need_repair = True
55         # - There isn't a recoverable version.
56         if num_recoverable != 1:
57             self.need_repair = True
58         # - The best recoverable version is missing some shares.
59         if self.best_version:
60             available_shares = servermap.shares_available()
61             (num_distinct_shares, k, N) = available_shares[self.best_version]
62             if num_distinct_shares < N:
63                 self.need_repair = True
64
65         return servermap
66
67     def _verify_all_shares(self, servermap):
68         # read every byte of each share
69         #
70         # This logic is going to be very nearly the same as the
71         # downloader. I bet we could pass the downloader a flag that
72         # makes it do this, and piggyback onto that instead of
73         # duplicating a bunch of code.
74         #
75         # Like:
76         #  r = Retrieve(blah, blah, blah, verify=True)
77         #  d = r.download()
78         #  (wait, wait, wait, d.callback)
79         #
80         #  Then, when it has finished, we can check the servermap (which
81         #  we provided to Retrieve) to figure out which shares are bad,
82         #  since the Retrieve process will have updated the servermap as
83         #  it went along.
84         #
85         #  By passing the verify=True flag to the constructor, we are
86         #  telling the downloader a few things.
87         #
88         #  1. It needs to download all N shares, not just K shares.
89         #  2. It doesn't need to decrypt or decode the shares, only
90         #     verify them.
91         if not self.best_version:
92             return
93
94         r = Retrieve(self._node, self._storage_broker, servermap,
95                      self.best_version, verify=True)
96         d = r.download()
97         d.addCallback(self._process_bad_shares)
98         return d
99
100
101     def _process_bad_shares(self, bad_shares):
102         if bad_shares:
103             self.need_repair = True
104         self.bad_shares = bad_shares
105
106
107     def _count_shares(self, smap, version):
108         available_shares = smap.shares_available()
109         (num_distinct_shares, k, N) = available_shares[version]
110         counters = {}
111         counters["count-shares-good"] = num_distinct_shares
112         counters["count-shares-needed"] = k
113         counters["count-shares-expected"] = N
114         good_hosts = smap.all_servers_for_version(version)
115         counters["count-good-share-hosts"] = len(good_hosts)
116         vmap = smap.make_versionmap()
117         counters["count-wrong-shares"] = sum([len(shares)
118                                           for verinfo,shares in vmap.items()
119                                           if verinfo != version])
120
121         return counters
122
123     def _make_checker_results(self, smap):
124         r = CheckResults(from_string(self._node.get_uri()),
125                          self._storage_index)
126         self._monitor.raise_if_cancelled()
127         r.set_servermap(smap.copy())
128         healthy = True
129         report = []
130         summary = []
131         vmap = smap.make_versionmap()
132         recoverable = smap.recoverable_versions()
133         unrecoverable = smap.unrecoverable_versions()
134
135         if recoverable:
136             report.append("Recoverable Versions: " +
137                           "/".join(["%d*%s" % (len(vmap[v]),
138                                                smap.summarize_version(v))
139                                     for v in recoverable]))
140         if unrecoverable:
141             report.append("Unrecoverable Versions: " +
142                           "/".join(["%d*%s" % (len(vmap[v]),
143                                                smap.summarize_version(v))
144                                     for v in unrecoverable]))
145         if smap.unrecoverable_versions():
146             healthy = False
147             summary.append("some versions are unrecoverable")
148             report.append("Unhealthy: some versions are unrecoverable")
149         if len(recoverable) == 0:
150             healthy = False
151             summary.append("no versions are recoverable")
152             report.append("Unhealthy: no versions are recoverable")
153         if len(recoverable) > 1:
154             healthy = False
155             summary.append("multiple versions are recoverable")
156             report.append("Unhealthy: there are multiple recoverable versions")
157
158         needs_rebalancing = False
159         if recoverable:
160             best_version = smap.best_recoverable_version()
161             report.append("Best Recoverable Version: " +
162                           smap.summarize_version(best_version))
163             counters = self._count_shares(smap, best_version)
164             s = counters["count-shares-good"]
165             k = counters["count-shares-needed"]
166             N = counters["count-shares-expected"]
167             if s < N:
168                 healthy = False
169                 report.append("Unhealthy: best version has only %d shares "
170                               "(encoding is %d-of-%d)" % (s, k, N))
171                 summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
172             hosts = smap.all_servers_for_version(best_version)
173             needs_rebalancing = bool( len(hosts) < N )
174         elif unrecoverable:
175             healthy = False
176             # find a k and N from somewhere
177             first = list(unrecoverable)[0]
178             # not exactly the best version, but that doesn't matter too much
179             counters = self._count_shares(smap, first)
180             # leave needs_rebalancing=False: the file being unrecoverable is
181             # the bigger problem
182         else:
183             # couldn't find anything at all
184             counters = {
185                 "count-shares-good": 0,
186                 "count-shares-needed": 3, # arbitrary defaults
187                 "count-shares-expected": 10,
188                 "count-good-share-hosts": 0,
189                 "count-wrong-shares": 0,
190                 }
191
192         corrupt_share_locators = []
193         if self.bad_shares:
194             report.append("Corrupt Shares:")
195             summary.append("Corrupt Shares:")
196         for (server, shnum, f) in sorted(self.bad_shares):
197             serverid = server.get_serverid()
198             locator = (serverid, self._storage_index, shnum)
199             corrupt_share_locators.append(locator)
200             s = "%s-sh%d" % (server.get_name(), shnum)
201             if f.check(CorruptShareError):
202                 ft = f.value.reason
203             else:
204                 ft = str(f)
205             report.append(" %s: %s" % (s, ft))
206             summary.append(s)
207             p = (serverid, self._storage_index, shnum, f)
208             r.problems.append(p)
209             msg = ("CorruptShareError during mutable verify, "
210                    "serverid=%(serverid)s, si=%(si)s, shnum=%(shnum)d, "
211                    "where=%(where)s")
212             log.msg(format=msg, serverid=server.get_name(),
213                     si=base32.b2a(self._storage_index),
214                     shnum=shnum,
215                     where=ft,
216                     level=log.WEIRD, umid="EkK8QA")
217
218         sharemap = {}
219         for verinfo in vmap:
220             for (shnum, server, timestamp) in vmap[verinfo]:
221                 shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
222                 if shareid not in sharemap:
223                     sharemap[shareid] = []
224                 sharemap[shareid].append(server.get_serverid())
225         servers_responding = [s.get_serverid() for s in
226                               list(smap.get_reachable_servers())]
227         r.set_data(
228             count_shares_needed=counters["count-shares-needed"],
229             count_shares_expected=counters["count-shares-expected"],
230             count_shares_good=counters["count-shares-good"],
231             count_good_share_hosts=counters["count-good-share-hosts"],
232             count_recoverable_versions=len(recoverable),
233             count_unrecoverable_versions=len(unrecoverable),
234             servers_responding=servers_responding,
235             sharemap=sharemap,
236             count_wrong_shares=counters["count-wrong-shares"],
237             list_corrupt_shares=corrupt_share_locators,
238             count_corrupt_shares=len(corrupt_share_locators),
239             list_incompatible_shares=[],
240             count_incompatible_shares=0,
241             )
242
243         r.set_healthy(healthy)
244         r.set_recoverable(bool(recoverable))
245         r.set_needs_rebalancing(needs_rebalancing)
246         if healthy:
247             r.set_summary("Healthy")
248         else:
249             r.set_summary("Unhealthy: " + " ".join(summary))
250         r.set_report(report)
251         return r
252
253
254 class MutableCheckAndRepairer(MutableChecker):
255     SERVERMAP_MODE = MODE_WRITE # needed to get the privkey
256
257     def __init__(self, node, storage_broker, history, monitor):
258         MutableChecker.__init__(self, node, storage_broker, history, monitor)
259         self.cr_results = CheckAndRepairResults(self._storage_index)
260         self.need_repair = False
261
262     def check(self, verify=False, add_lease=False):
263         d = MutableChecker.check(self, verify, add_lease)
264         d.addCallback(self._stash_pre_repair_results)
265         d.addCallback(self._maybe_repair)
266         d.addCallback(lambda res: self.cr_results)
267         return d
268
269     def _stash_pre_repair_results(self, pre_repair_results):
270         self.cr_results.pre_repair_results = pre_repair_results
271         return pre_repair_results
272
273     def _maybe_repair(self, pre_repair_results):
274         crr = self.cr_results
275         self._monitor.raise_if_cancelled()
276         if not self.need_repair:
277             crr.post_repair_results = pre_repair_results
278             return
279         if self._node.is_readonly():
280             # ticket #625: we cannot yet repair read-only mutable files
281             crr.post_repair_results = pre_repair_results
282             crr.repair_attempted = False
283             return
284         crr.repair_attempted = True
285         d = self._node.repair(pre_repair_results, monitor=self._monitor)
286         def _repair_finished(rr):
287             crr.repair_successful = rr.get_successful()
288             crr.post_repair_results = self._make_checker_results(rr.servermap)
289             crr.repair_results = rr # TODO?
290             return
291         def _repair_error(f):
292             # I'm not sure if I want to pass through a failure or not.
293             crr.repair_successful = False
294             crr.repair_failure = f # TODO?
295             #crr.post_repair_results = ??
296             return f
297         d.addCallbacks(_repair_finished, _repair_error)
298         return d