]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/checker.py
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / checker.py
1
2 from twisted.internet import defer
3 from twisted.python import failure
4 from allmydata import hashtree
5 from allmydata.uri import from_string
6 from allmydata.util import hashutil, base32, idlib, log
7 from allmydata.check_results import CheckAndRepairResults, CheckResults
8
9 from common import MODE_CHECK, CorruptShareError
10 from servermap import ServerMap, ServermapUpdater
11 from layout import unpack_share, SIGNED_PREFIX_LENGTH
12
13 class MutableChecker:
14
15     def __init__(self, node, storage_broker, history, monitor):
16         self._node = node
17         self._storage_broker = storage_broker
18         self._history = history
19         self._monitor = monitor
20         self.bad_shares = [] # list of (nodeid,shnum,failure)
21         self._storage_index = self._node.get_storage_index()
22         self.results = CheckResults(from_string(node.get_uri()), self._storage_index)
23         self.need_repair = False
24         self.responded = set() # set of (binary) nodeids
25
26     def check(self, verify=False, add_lease=False):
27         servermap = ServerMap()
28         u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
29                              servermap, MODE_CHECK, add_lease=add_lease)
30         if self._history:
31             self._history.notify_mapupdate(u.get_status())
32         d = u.update()
33         d.addCallback(self._got_mapupdate_results)
34         if verify:
35             d.addCallback(self._verify_all_shares)
36         d.addCallback(lambda res: servermap)
37         d.addCallback(self._fill_checker_results, self.results)
38         d.addCallback(lambda res: self.results)
39         return d
40
41     def _got_mapupdate_results(self, servermap):
42         # the file is healthy if there is exactly one recoverable version, it
43         # has at least N distinct shares, and there are no unrecoverable
44         # versions: all existing shares will be for the same version.
45         self._monitor.raise_if_cancelled()
46         self.best_version = None
47         num_recoverable = len(servermap.recoverable_versions())
48         if num_recoverable:
49             self.best_version = servermap.best_recoverable_version()
50
51         if servermap.unrecoverable_versions():
52             self.need_repair = True
53         if num_recoverable != 1:
54             self.need_repair = True
55         if self.best_version:
56             available_shares = servermap.shares_available()
57             (num_distinct_shares, k, N) = available_shares[self.best_version]
58             if num_distinct_shares < N:
59                 self.need_repair = True
60
61         return servermap
62
63     def _verify_all_shares(self, servermap):
64         # read every byte of each share
65         if not self.best_version:
66             return
67         versionmap = servermap.make_versionmap()
68         shares = versionmap[self.best_version]
69         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
70          offsets_tuple) = self.best_version
71         offsets = dict(offsets_tuple)
72         readv = [ (0, offsets["EOF"]) ]
73         dl = []
74         for (shnum, peerid, timestamp) in shares:
75             ss = servermap.connections[peerid]
76             d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
77             d.addCallback(self._got_answer, peerid, servermap)
78             dl.append(d)
79         return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
80
81     def _do_read(self, ss, peerid, storage_index, shnums, readv):
82         # isolate the callRemote to a separate method, so tests can subclass
83         # Publish and override it
84         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
85         return d
86
87     def _got_answer(self, datavs, peerid, servermap):
88         for shnum,datav in datavs.items():
89             data = datav[0]
90             try:
91                 self._got_results_one_share(shnum, peerid, data)
92             except CorruptShareError:
93                 f = failure.Failure()
94                 self.need_repair = True
95                 self.bad_shares.append( (peerid, shnum, f) )
96                 prefix = data[:SIGNED_PREFIX_LENGTH]
97                 servermap.mark_bad_share(peerid, shnum, prefix)
98                 ss = servermap.connections[peerid]
99                 self.notify_server_corruption(ss, shnum, str(f.value))
100
101     def check_prefix(self, peerid, shnum, data):
102         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
103          offsets_tuple) = self.best_version
104         got_prefix = data[:SIGNED_PREFIX_LENGTH]
105         if got_prefix != prefix:
106             raise CorruptShareError(peerid, shnum,
107                                     "prefix mismatch: share changed while we were reading it")
108
109     def _got_results_one_share(self, shnum, peerid, data):
110         self.check_prefix(peerid, shnum, data)
111
112         # the [seqnum:signature] pieces are validated by _compare_prefix,
113         # which checks their signature against the pubkey known to be
114         # associated with this file.
115
116         (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
117          share_hash_chain, block_hash_tree, share_data,
118          enc_privkey) = unpack_share(data)
119
120         # validate [share_hash_chain,block_hash_tree,share_data]
121
122         leaves = [hashutil.block_hash(share_data)]
123         t = hashtree.HashTree(leaves)
124         if list(t) != block_hash_tree:
125             raise CorruptShareError(peerid, shnum, "block hash tree failure")
126         share_hash_leaf = t[0]
127         t2 = hashtree.IncompleteHashTree(N)
128         # root_hash was checked by the signature
129         t2.set_hashes({0: root_hash})
130         try:
131             t2.set_hashes(hashes=share_hash_chain,
132                           leaves={shnum: share_hash_leaf})
133         except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
134                 IndexError), e:
135             msg = "corrupt hashes: %s" % (e,)
136             raise CorruptShareError(peerid, shnum, msg)
137
138         # validate enc_privkey: only possible if we have a write-cap
139         if not self._node.is_readonly():
140             alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
141             alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
142             if alleged_writekey != self._node.get_writekey():
143                 raise CorruptShareError(peerid, shnum, "invalid privkey")
144
145     def notify_server_corruption(self, ss, shnum, reason):
146         ss.callRemoteOnly("advise_corrupt_share",
147                           "mutable", self._storage_index, shnum, reason)
148
149     def _count_shares(self, smap, version):
150         available_shares = smap.shares_available()
151         (num_distinct_shares, k, N) = available_shares[version]
152         counters = {}
153         counters["count-shares-good"] = num_distinct_shares
154         counters["count-shares-needed"] = k
155         counters["count-shares-expected"] = N
156         good_hosts = smap.all_peers_for_version(version)
157         counters["count-good-share-hosts"] = len(good_hosts)
158         vmap = smap.make_versionmap()
159         counters["count-wrong-shares"] = sum([len(shares)
160                                           for verinfo,shares in vmap.items()
161                                           if verinfo != version])
162
163         return counters
164
165     def _fill_checker_results(self, smap, r):
166         self._monitor.raise_if_cancelled()
167         r.set_servermap(smap.copy())
168         healthy = True
169         data = {}
170         report = []
171         summary = []
172         vmap = smap.make_versionmap()
173         recoverable = smap.recoverable_versions()
174         unrecoverable = smap.unrecoverable_versions()
175         data["count-recoverable-versions"] = len(recoverable)
176         data["count-unrecoverable-versions"] = len(unrecoverable)
177
178         if recoverable:
179             report.append("Recoverable Versions: " +
180                           "/".join(["%d*%s" % (len(vmap[v]),
181                                                smap.summarize_version(v))
182                                     for v in recoverable]))
183         if unrecoverable:
184             report.append("Unrecoverable Versions: " +
185                           "/".join(["%d*%s" % (len(vmap[v]),
186                                                smap.summarize_version(v))
187                                     for v in unrecoverable]))
188         if smap.unrecoverable_versions():
189             healthy = False
190             summary.append("some versions are unrecoverable")
191             report.append("Unhealthy: some versions are unrecoverable")
192         if len(recoverable) == 0:
193             healthy = False
194             summary.append("no versions are recoverable")
195             report.append("Unhealthy: no versions are recoverable")
196         if len(recoverable) > 1:
197             healthy = False
198             summary.append("multiple versions are recoverable")
199             report.append("Unhealthy: there are multiple recoverable versions")
200
201         needs_rebalancing = False
202         if recoverable:
203             best_version = smap.best_recoverable_version()
204             report.append("Best Recoverable Version: " +
205                           smap.summarize_version(best_version))
206             counters = self._count_shares(smap, best_version)
207             data.update(counters)
208             s = counters["count-shares-good"]
209             k = counters["count-shares-needed"]
210             N = counters["count-shares-expected"]
211             if s < N:
212                 healthy = False
213                 report.append("Unhealthy: best version has only %d shares "
214                               "(encoding is %d-of-%d)" % (s, k, N))
215                 summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
216             hosts = smap.all_peers_for_version(best_version)
217             needs_rebalancing = bool( len(hosts) < N )
218         elif unrecoverable:
219             healthy = False
220             # find a k and N from somewhere
221             first = list(unrecoverable)[0]
222             # not exactly the best version, but that doesn't matter too much
223             data.update(self._count_shares(smap, first))
224             # leave needs_rebalancing=False: the file being unrecoverable is
225             # the bigger problem
226         else:
227             # couldn't find anything at all
228             data["count-shares-good"] = 0
229             data["count-shares-needed"] = 3 # arbitrary defaults
230             data["count-shares-expected"] = 10
231             data["count-good-share-hosts"] = 0
232             data["count-wrong-shares"] = 0
233
234         if self.bad_shares:
235             data["count-corrupt-shares"] = len(self.bad_shares)
236             data["list-corrupt-shares"] = locators = []
237             report.append("Corrupt Shares:")
238             summary.append("Corrupt Shares:")
239             for (peerid, shnum, f) in sorted(self.bad_shares):
240                 locators.append( (peerid, self._storage_index, shnum) )
241                 s = "%s-sh%d" % (idlib.shortnodeid_b2a(peerid), shnum)
242                 if f.check(CorruptShareError):
243                     ft = f.value.reason
244                 else:
245                     ft = str(f)
246                 report.append(" %s: %s" % (s, ft))
247                 summary.append(s)
248                 p = (peerid, self._storage_index, shnum, f)
249                 r.problems.append(p)
250                 msg = ("CorruptShareError during mutable verify, "
251                        "peerid=%(peerid)s, si=%(si)s, shnum=%(shnum)d, "
252                        "where=%(where)s")
253                 log.msg(format=msg, peerid=idlib.nodeid_b2a(peerid),
254                         si=base32.b2a(self._storage_index),
255                         shnum=shnum,
256                         where=ft,
257                         level=log.WEIRD, umid="EkK8QA")
258         else:
259             data["count-corrupt-shares"] = 0
260             data["list-corrupt-shares"] = []
261
262         sharemap = {}
263         for verinfo in vmap:
264             for (shnum, peerid, timestamp) in vmap[verinfo]:
265                 shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
266                 if shareid not in sharemap:
267                     sharemap[shareid] = []
268                 sharemap[shareid].append(peerid)
269         data["sharemap"] = sharemap
270         data["servers-responding"] = list(smap.reachable_peers)
271
272         r.set_healthy(healthy)
273         r.set_recoverable(bool(recoverable))
274         r.set_needs_rebalancing(needs_rebalancing)
275         r.set_data(data)
276         if healthy:
277             r.set_summary("Healthy")
278         else:
279             r.set_summary("Unhealthy: " + " ".join(summary))
280         r.set_report(report)
281
282
283 class MutableCheckAndRepairer(MutableChecker):
284     def __init__(self, node, storage_broker, history, monitor):
285         MutableChecker.__init__(self, node, storage_broker, history, monitor)
286         self.cr_results = CheckAndRepairResults(self._storage_index)
287         self.cr_results.pre_repair_results = self.results
288         self.need_repair = False
289
290     def check(self, verify=False, add_lease=False):
291         d = MutableChecker.check(self, verify, add_lease)
292         d.addCallback(self._maybe_repair)
293         d.addCallback(lambda res: self.cr_results)
294         return d
295
296     def _maybe_repair(self, res):
297         self._monitor.raise_if_cancelled()
298         if not self.need_repair:
299             self.cr_results.post_repair_results = self.results
300             return
301         if self._node.is_readonly():
302             # ticket #625: we cannot yet repair read-only mutable files
303             self.cr_results.post_repair_results = self.results
304             self.cr_results.repair_attempted = False
305             return
306         self.cr_results.repair_attempted = True
307         d = self._node.repair(self.results)
308         def _repair_finished(repair_results):
309             self.cr_results.repair_successful = True
310             r = CheckResults(from_string(self._node.get_uri()), self._storage_index)
311             self.cr_results.post_repair_results = r
312             self._fill_checker_results(repair_results.servermap, r)
313             self.cr_results.repair_results = repair_results # TODO?
314         def _repair_error(f):
315             # I'm not sure if I want to pass through a failure or not.
316             self.cr_results.repair_successful = False
317             self.cr_results.repair_failure = f # TODO?
318             #self.cr_results.post_repair_results = ??
319             return f
320         d.addCallbacks(_repair_finished, _repair_error)
321         return d