]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/checker.py
mutable: train checker and repairer to work with MDMF mutable files
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / checker.py
1
2 from allmydata.uri import from_string
3 from allmydata.util import base32, idlib, log
4 from allmydata.check_results import CheckAndRepairResults, CheckResults
5
6 from allmydata.mutable.common import MODE_CHECK, CorruptShareError
7 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
8 from allmydata.mutable.retrieve import Retrieve # for verifying
9
10 class MutableChecker:
11
12     def __init__(self, node, storage_broker, history, monitor):
13         self._node = node
14         self._storage_broker = storage_broker
15         self._history = history
16         self._monitor = monitor
17         self.bad_shares = [] # list of (nodeid,shnum,failure)
18         self._storage_index = self._node.get_storage_index()
19         self.results = CheckResults(from_string(node.get_uri()), self._storage_index)
20         self.need_repair = False
21         self.responded = set() # set of (binary) nodeids
22
23     def check(self, verify=False, add_lease=False):
24         servermap = ServerMap()
25         # Updating the servermap in MODE_CHECK will stand a good chance
26         # of finding all of the shares, and getting a good idea of
27         # recoverability, etc, without verifying.
28         u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
29                              servermap, MODE_CHECK, add_lease=add_lease)
30         if self._history:
31             self._history.notify_mapupdate(u.get_status())
32         d = u.update()
33         d.addCallback(self._got_mapupdate_results)
34         if verify:
35             d.addCallback(self._verify_all_shares)
36         d.addCallback(lambda res: servermap)
37         d.addCallback(self._fill_checker_results, self.results)
38         d.addCallback(lambda res: self.results)
39         return d
40
41     def _got_mapupdate_results(self, servermap):
42         # the file is healthy if there is exactly one recoverable version, it
43         # has at least N distinct shares, and there are no unrecoverable
44         # versions: all existing shares will be for the same version.
45         self._monitor.raise_if_cancelled()
46         self.best_version = None
47         num_recoverable = len(servermap.recoverable_versions())
48         if num_recoverable:
49             self.best_version = servermap.best_recoverable_version()
50
51         # The file is unhealthy and needs to be repaired if:
52         # - There are unrecoverable versions.
53         if servermap.unrecoverable_versions():
54             self.need_repair = True
55         # - There isn't a recoverable version.
56         if num_recoverable != 1:
57             self.need_repair = True
58         # - The best recoverable version is missing some shares.
59         if self.best_version:
60             available_shares = servermap.shares_available()
61             (num_distinct_shares, k, N) = available_shares[self.best_version]
62             if num_distinct_shares < N:
63                 self.need_repair = True
64
65         return servermap
66
67     def _verify_all_shares(self, servermap):
68         # read every byte of each share
69         #
70         # This logic is going to be very nearly the same as the
71         # downloader. I bet we could pass the downloader a flag that
72         # makes it do this, and piggyback onto that instead of
73         # duplicating a bunch of code.
74         # 
75         # Like:
76         #  r = Retrieve(blah, blah, blah, verify=True)
77         #  d = r.download()
78         #  (wait, wait, wait, d.callback)
79         #  
80         #  Then, when it has finished, we can check the servermap (which
81         #  we provided to Retrieve) to figure out which shares are bad,
82         #  since the Retrieve process will have updated the servermap as
83         #  it went along.
84         #
85         #  By passing the verify=True flag to the constructor, we are
86         #  telling the downloader a few things.
87         # 
88         #  1. It needs to download all N shares, not just K shares.
89         #  2. It doesn't need to decrypt or decode the shares, only
90         #     verify them.
91         if not self.best_version:
92             return
93
94         r = Retrieve(self._node, servermap, self.best_version, verify=True)
95         d = r.download()
96         d.addCallback(self._process_bad_shares)
97         return d
98
99
100     def _process_bad_shares(self, bad_shares):
101         if bad_shares:
102             self.need_repair = True
103         self.bad_shares = bad_shares
104
105
106     def _count_shares(self, smap, version):
107         available_shares = smap.shares_available()
108         (num_distinct_shares, k, N) = available_shares[version]
109         counters = {}
110         counters["count-shares-good"] = num_distinct_shares
111         counters["count-shares-needed"] = k
112         counters["count-shares-expected"] = N
113         good_hosts = smap.all_peers_for_version(version)
114         counters["count-good-share-hosts"] = len(good_hosts)
115         vmap = smap.make_versionmap()
116         counters["count-wrong-shares"] = sum([len(shares)
117                                           for verinfo,shares in vmap.items()
118                                           if verinfo != version])
119
120         return counters
121
122     def _fill_checker_results(self, smap, r):
123         self._monitor.raise_if_cancelled()
124         r.set_servermap(smap.copy())
125         healthy = True
126         data = {}
127         report = []
128         summary = []
129         vmap = smap.make_versionmap()
130         recoverable = smap.recoverable_versions()
131         unrecoverable = smap.unrecoverable_versions()
132         data["count-recoverable-versions"] = len(recoverable)
133         data["count-unrecoverable-versions"] = len(unrecoverable)
134
135         if recoverable:
136             report.append("Recoverable Versions: " +
137                           "/".join(["%d*%s" % (len(vmap[v]),
138                                                smap.summarize_version(v))
139                                     for v in recoverable]))
140         if unrecoverable:
141             report.append("Unrecoverable Versions: " +
142                           "/".join(["%d*%s" % (len(vmap[v]),
143                                                smap.summarize_version(v))
144                                     for v in unrecoverable]))
145         if smap.unrecoverable_versions():
146             healthy = False
147             summary.append("some versions are unrecoverable")
148             report.append("Unhealthy: some versions are unrecoverable")
149         if len(recoverable) == 0:
150             healthy = False
151             summary.append("no versions are recoverable")
152             report.append("Unhealthy: no versions are recoverable")
153         if len(recoverable) > 1:
154             healthy = False
155             summary.append("multiple versions are recoverable")
156             report.append("Unhealthy: there are multiple recoverable versions")
157
158         needs_rebalancing = False
159         if recoverable:
160             best_version = smap.best_recoverable_version()
161             report.append("Best Recoverable Version: " +
162                           smap.summarize_version(best_version))
163             counters = self._count_shares(smap, best_version)
164             data.update(counters)
165             s = counters["count-shares-good"]
166             k = counters["count-shares-needed"]
167             N = counters["count-shares-expected"]
168             if s < N:
169                 healthy = False
170                 report.append("Unhealthy: best version has only %d shares "
171                               "(encoding is %d-of-%d)" % (s, k, N))
172                 summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
173             hosts = smap.all_peers_for_version(best_version)
174             needs_rebalancing = bool( len(hosts) < N )
175         elif unrecoverable:
176             healthy = False
177             # find a k and N from somewhere
178             first = list(unrecoverable)[0]
179             # not exactly the best version, but that doesn't matter too much
180             data.update(self._count_shares(smap, first))
181             # leave needs_rebalancing=False: the file being unrecoverable is
182             # the bigger problem
183         else:
184             # couldn't find anything at all
185             data["count-shares-good"] = 0
186             data["count-shares-needed"] = 3 # arbitrary defaults
187             data["count-shares-expected"] = 10
188             data["count-good-share-hosts"] = 0
189             data["count-wrong-shares"] = 0
190
191         if self.bad_shares:
192             data["count-corrupt-shares"] = len(self.bad_shares)
193             data["list-corrupt-shares"] = locators = []
194             report.append("Corrupt Shares:")
195             summary.append("Corrupt Shares:")
196             for (peerid, shnum, f) in sorted(self.bad_shares):
197                 locators.append( (peerid, self._storage_index, shnum) )
198                 s = "%s-sh%d" % (idlib.shortnodeid_b2a(peerid), shnum)
199                 if f.check(CorruptShareError):
200                     ft = f.value.reason
201                 else:
202                     ft = str(f)
203                 report.append(" %s: %s" % (s, ft))
204                 summary.append(s)
205                 p = (peerid, self._storage_index, shnum, f)
206                 r.problems.append(p)
207                 msg = ("CorruptShareError during mutable verify, "
208                        "peerid=%(peerid)s, si=%(si)s, shnum=%(shnum)d, "
209                        "where=%(where)s")
210                 log.msg(format=msg, peerid=idlib.nodeid_b2a(peerid),
211                         si=base32.b2a(self._storage_index),
212                         shnum=shnum,
213                         where=ft,
214                         level=log.WEIRD, umid="EkK8QA")
215         else:
216             data["count-corrupt-shares"] = 0
217             data["list-corrupt-shares"] = []
218
219         sharemap = {}
220         for verinfo in vmap:
221             for (shnum, peerid, timestamp) in vmap[verinfo]:
222                 shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
223                 if shareid not in sharemap:
224                     sharemap[shareid] = []
225                 sharemap[shareid].append(peerid)
226         data["sharemap"] = sharemap
227         data["servers-responding"] = list(smap.reachable_peers)
228
229         r.set_healthy(healthy)
230         r.set_recoverable(bool(recoverable))
231         r.set_needs_rebalancing(needs_rebalancing)
232         r.set_data(data)
233         if healthy:
234             r.set_summary("Healthy")
235         else:
236             r.set_summary("Unhealthy: " + " ".join(summary))
237         r.set_report(report)
238
239
240 class MutableCheckAndRepairer(MutableChecker):
241     def __init__(self, node, storage_broker, history, monitor):
242         MutableChecker.__init__(self, node, storage_broker, history, monitor)
243         self.cr_results = CheckAndRepairResults(self._storage_index)
244         self.cr_results.pre_repair_results = self.results
245         self.need_repair = False
246
247     def check(self, verify=False, add_lease=False):
248         d = MutableChecker.check(self, verify, add_lease)
249         d.addCallback(self._maybe_repair)
250         d.addCallback(lambda res: self.cr_results)
251         return d
252
253     def _maybe_repair(self, res):
254         self._monitor.raise_if_cancelled()
255         if not self.need_repair:
256             self.cr_results.post_repair_results = self.results
257             return
258         if self._node.is_readonly():
259             # ticket #625: we cannot yet repair read-only mutable files
260             self.cr_results.post_repair_results = self.results
261             self.cr_results.repair_attempted = False
262             return
263         self.cr_results.repair_attempted = True
264         d = self._node.repair(self.results)
265         def _repair_finished(repair_results):
266             self.cr_results.repair_successful = repair_results.get_successful()
267             r = CheckResults(from_string(self._node.get_uri()), self._storage_index)
268             self.cr_results.post_repair_results = r
269             self._fill_checker_results(repair_results.servermap, r)
270             self.cr_results.repair_results = repair_results # TODO?
271         def _repair_error(f):
272             # I'm not sure if I want to pass through a failure or not.
273             self.cr_results.repair_successful = False
274             self.cr_results.repair_failure = f # TODO?
275             #self.cr_results.post_repair_results = ??
276             return f
277         d.addCallbacks(_repair_finished, _repair_error)
278         return d