2 from twisted.internet import defer
3 from twisted.python import failure
4 from allmydata import hashtree
5 from allmydata.uri import from_string
6 from allmydata.util import hashutil, base32, idlib, log
7 from allmydata.check_results import CheckAndRepairResults, CheckResults
9 from common import MODE_CHECK, CorruptShareError
10 from servermap import ServerMap, ServermapUpdater
11 from layout import unpack_share, SIGNED_PREFIX_LENGTH
15 def __init__(self, node, storage_broker, history, monitor):
17 self._storage_broker = storage_broker
18 self._history = history
19 self._monitor = monitor
20 self.bad_shares = [] # list of (nodeid,shnum,failure)
21 self._storage_index = self._node.get_storage_index()
22 self.results = CheckResults(from_string(node.get_uri()), self._storage_index)
23 self.need_repair = False
24 self.responded = set() # set of (binary) nodeids
26 def check(self, verify=False, add_lease=False):
27 servermap = ServerMap()
28 u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
29 servermap, MODE_CHECK, add_lease=add_lease)
31 self._history.notify_mapupdate(u.get_status())
33 d.addCallback(self._got_mapupdate_results)
35 d.addCallback(self._verify_all_shares)
36 d.addCallback(lambda res: servermap)
37 d.addCallback(self._fill_checker_results, self.results)
38 d.addCallback(lambda res: self.results)
41 def _got_mapupdate_results(self, servermap):
42 # the file is healthy if there is exactly one recoverable version, it
43 # has at least N distinct shares, and there are no unrecoverable
44 # versions: all existing shares will be for the same version.
45 self._monitor.raise_if_cancelled()
46 self.best_version = None
47 num_recoverable = len(servermap.recoverable_versions())
49 self.best_version = servermap.best_recoverable_version()
51 if servermap.unrecoverable_versions():
52 self.need_repair = True
53 if num_recoverable != 1:
54 self.need_repair = True
56 available_shares = servermap.shares_available()
57 (num_distinct_shares, k, N) = available_shares[self.best_version]
58 if num_distinct_shares < N:
59 self.need_repair = True
63 def _verify_all_shares(self, servermap):
64 # read every byte of each share
65 if not self.best_version:
67 versionmap = servermap.make_versionmap()
68 shares = versionmap[self.best_version]
69 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
70 offsets_tuple) = self.best_version
71 offsets = dict(offsets_tuple)
72 readv = [ (0, offsets["EOF"]) ]
74 for (shnum, peerid, timestamp) in shares:
75 ss = servermap.connections[peerid]
76 d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
77 d.addCallback(self._got_answer, peerid, servermap)
79 return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
81 def _do_read(self, ss, peerid, storage_index, shnums, readv):
82 # isolate the callRemote to a separate method, so tests can subclass
83 # Publish and override it
84 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
87 def _got_answer(self, datavs, peerid, servermap):
88 for shnum,datav in datavs.items():
91 self._got_results_one_share(shnum, peerid, data)
92 except CorruptShareError:
94 self.need_repair = True
95 self.bad_shares.append( (peerid, shnum, f) )
96 prefix = data[:SIGNED_PREFIX_LENGTH]
97 servermap.mark_bad_share(peerid, shnum, prefix)
98 ss = servermap.connections[peerid]
99 self.notify_server_corruption(ss, shnum, str(f.value))
101 def check_prefix(self, peerid, shnum, data):
102 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
103 offsets_tuple) = self.best_version
104 got_prefix = data[:SIGNED_PREFIX_LENGTH]
105 if got_prefix != prefix:
106 raise CorruptShareError(peerid, shnum,
107 "prefix mismatch: share changed while we were reading it")
109 def _got_results_one_share(self, shnum, peerid, data):
110 self.check_prefix(peerid, shnum, data)
112 # the [seqnum:signature] pieces are validated by _compare_prefix,
113 # which checks their signature against the pubkey known to be
114 # associated with this file.
116 (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
117 share_hash_chain, block_hash_tree, share_data,
118 enc_privkey) = unpack_share(data)
120 # validate [share_hash_chain,block_hash_tree,share_data]
122 leaves = [hashutil.block_hash(share_data)]
123 t = hashtree.HashTree(leaves)
124 if list(t) != block_hash_tree:
125 raise CorruptShareError(peerid, shnum, "block hash tree failure")
126 share_hash_leaf = t[0]
127 t2 = hashtree.IncompleteHashTree(N)
128 # root_hash was checked by the signature
129 t2.set_hashes({0: root_hash})
131 t2.set_hashes(hashes=share_hash_chain,
132 leaves={shnum: share_hash_leaf})
133 except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
135 msg = "corrupt hashes: %s" % (e,)
136 raise CorruptShareError(peerid, shnum, msg)
138 # validate enc_privkey: only possible if we have a write-cap
139 if not self._node.is_readonly():
140 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
141 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
142 if alleged_writekey != self._node.get_writekey():
143 raise CorruptShareError(peerid, shnum, "invalid privkey")
145 def notify_server_corruption(self, ss, shnum, reason):
146 ss.callRemoteOnly("advise_corrupt_share",
147 "mutable", self._storage_index, shnum, reason)
149 def _count_shares(self, smap, version):
150 available_shares = smap.shares_available()
151 (num_distinct_shares, k, N) = available_shares[version]
153 counters["count-shares-good"] = num_distinct_shares
154 counters["count-shares-needed"] = k
155 counters["count-shares-expected"] = N
156 good_hosts = smap.all_peers_for_version(version)
157 counters["count-good-share-hosts"] = len(good_hosts)
158 vmap = smap.make_versionmap()
159 counters["count-wrong-shares"] = sum([len(shares)
160 for verinfo,shares in vmap.items()
161 if verinfo != version])
165 def _fill_checker_results(self, smap, r):
166 self._monitor.raise_if_cancelled()
167 r.set_servermap(smap.copy())
172 vmap = smap.make_versionmap()
173 recoverable = smap.recoverable_versions()
174 unrecoverable = smap.unrecoverable_versions()
175 data["count-recoverable-versions"] = len(recoverable)
176 data["count-unrecoverable-versions"] = len(unrecoverable)
179 report.append("Recoverable Versions: " +
180 "/".join(["%d*%s" % (len(vmap[v]),
181 smap.summarize_version(v))
182 for v in recoverable]))
184 report.append("Unrecoverable Versions: " +
185 "/".join(["%d*%s" % (len(vmap[v]),
186 smap.summarize_version(v))
187 for v in unrecoverable]))
188 if smap.unrecoverable_versions():
190 summary.append("some versions are unrecoverable")
191 report.append("Unhealthy: some versions are unrecoverable")
192 if len(recoverable) == 0:
194 summary.append("no versions are recoverable")
195 report.append("Unhealthy: no versions are recoverable")
196 if len(recoverable) > 1:
198 summary.append("multiple versions are recoverable")
199 report.append("Unhealthy: there are multiple recoverable versions")
201 needs_rebalancing = False
203 best_version = smap.best_recoverable_version()
204 report.append("Best Recoverable Version: " +
205 smap.summarize_version(best_version))
206 counters = self._count_shares(smap, best_version)
207 data.update(counters)
208 s = counters["count-shares-good"]
209 k = counters["count-shares-needed"]
210 N = counters["count-shares-expected"]
213 report.append("Unhealthy: best version has only %d shares "
214 "(encoding is %d-of-%d)" % (s, k, N))
215 summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
216 hosts = smap.all_peers_for_version(best_version)
217 needs_rebalancing = bool( len(hosts) < N )
220 # find a k and N from somewhere
221 first = list(unrecoverable)[0]
222 # not exactly the best version, but that doesn't matter too much
223 data.update(self._count_shares(smap, first))
224 # leave needs_rebalancing=False: the file being unrecoverable is
227 # couldn't find anything at all
228 data["count-shares-good"] = 0
229 data["count-shares-needed"] = 3 # arbitrary defaults
230 data["count-shares-expected"] = 10
231 data["count-good-share-hosts"] = 0
232 data["count-wrong-shares"] = 0
235 data["count-corrupt-shares"] = len(self.bad_shares)
236 data["list-corrupt-shares"] = locators = []
237 report.append("Corrupt Shares:")
238 summary.append("Corrupt Shares:")
239 for (peerid, shnum, f) in sorted(self.bad_shares):
240 locators.append( (peerid, self._storage_index, shnum) )
241 s = "%s-sh%d" % (idlib.shortnodeid_b2a(peerid), shnum)
242 if f.check(CorruptShareError):
246 report.append(" %s: %s" % (s, ft))
248 p = (peerid, self._storage_index, shnum, f)
250 msg = ("CorruptShareError during mutable verify, "
251 "peerid=%(peerid)s, si=%(si)s, shnum=%(shnum)d, "
253 log.msg(format=msg, peerid=idlib.nodeid_b2a(peerid),
254 si=base32.b2a(self._storage_index),
257 level=log.WEIRD, umid="EkK8QA")
259 data["count-corrupt-shares"] = 0
260 data["list-corrupt-shares"] = []
264 for (shnum, peerid, timestamp) in vmap[verinfo]:
265 shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
266 if shareid not in sharemap:
267 sharemap[shareid] = []
268 sharemap[shareid].append(peerid)
269 data["sharemap"] = sharemap
270 data["servers-responding"] = list(smap.reachable_peers)
272 r.set_healthy(healthy)
273 r.set_recoverable(bool(recoverable))
274 r.set_needs_rebalancing(needs_rebalancing)
277 r.set_summary("Healthy")
279 r.set_summary("Unhealthy: " + " ".join(summary))
283 class MutableCheckAndRepairer(MutableChecker):
284 def __init__(self, node, storage_broker, history, monitor):
285 MutableChecker.__init__(self, node, storage_broker, history, monitor)
286 self.cr_results = CheckAndRepairResults(self._storage_index)
287 self.cr_results.pre_repair_results = self.results
288 self.need_repair = False
290 def check(self, verify=False, add_lease=False):
291 d = MutableChecker.check(self, verify, add_lease)
292 d.addCallback(self._maybe_repair)
293 d.addCallback(lambda res: self.cr_results)
296 def _maybe_repair(self, res):
297 self._monitor.raise_if_cancelled()
298 if not self.need_repair:
299 self.cr_results.post_repair_results = self.results
301 if self._node.is_readonly():
302 # ticket #625: we cannot yet repair read-only mutable files
303 self.cr_results.post_repair_results = self.results
304 self.cr_results.repair_attempted = False
306 self.cr_results.repair_attempted = True
307 d = self._node.repair(self.results)
308 def _repair_finished(repair_results):
309 self.cr_results.repair_successful = True
310 r = CheckResults(from_string(self._node.get_uri()), self._storage_index)
311 self.cr_results.post_repair_results = r
312 self._fill_checker_results(repair_results.servermap, r)
313 self.cr_results.repair_results = repair_results # TODO?
314 def _repair_error(f):
315 # I'm not sure if I want to pass through a failure or not.
316 self.cr_results.repair_successful = False
317 self.cr_results.repair_failure = f # TODO?
318 #self.cr_results.post_repair_results = ??
320 d.addCallbacks(_repair_finished, _repair_error)