2 from allmydata.uri import from_string
3 from allmydata.util import base32, log
4 from allmydata.check_results import CheckAndRepairResults, CheckResults
6 from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError
7 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
8 from allmydata.mutable.retrieve import Retrieve # for verifying
11 SERVERMAP_MODE = MODE_CHECK
13 def __init__(self, node, storage_broker, history, monitor):
15 self._storage_broker = storage_broker
16 self._history = history
17 self._monitor = monitor
18 self.bad_shares = [] # list of (server,shnum,failure)
19 self._storage_index = self._node.get_storage_index()
20 self.results = CheckResults(from_string(node.get_uri()), self._storage_index)
21 self.need_repair = False
22 self.responded = set() # set of (binary) nodeids
24 def check(self, verify=False, add_lease=False):
25 servermap = ServerMap()
26 # Updating the servermap in MODE_CHECK will stand a good chance
27 # of finding all of the shares, and getting a good idea of
28 # recoverability, etc, without verifying.
29 u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
30 servermap, self.SERVERMAP_MODE,
33 self._history.notify_mapupdate(u.get_status())
35 d.addCallback(self._got_mapupdate_results)
37 d.addCallback(self._verify_all_shares)
38 d.addCallback(lambda res: servermap)
39 d.addCallback(self._fill_checker_results, self.results)
40 d.addCallback(lambda res: self.results)
43 def _got_mapupdate_results(self, servermap):
44 # the file is healthy if there is exactly one recoverable version, it
45 # has at least N distinct shares, and there are no unrecoverable
46 # versions: all existing shares will be for the same version.
47 self._monitor.raise_if_cancelled()
48 self.best_version = None
49 num_recoverable = len(servermap.recoverable_versions())
51 self.best_version = servermap.best_recoverable_version()
53 # The file is unhealthy and needs to be repaired if:
54 # - There are unrecoverable versions.
55 if servermap.unrecoverable_versions():
56 self.need_repair = True
57 # - There isn't a recoverable version.
58 if num_recoverable != 1:
59 self.need_repair = True
60 # - The best recoverable version is missing some shares.
62 available_shares = servermap.shares_available()
63 (num_distinct_shares, k, N) = available_shares[self.best_version]
64 if num_distinct_shares < N:
65 self.need_repair = True
69 def _verify_all_shares(self, servermap):
70 # read every byte of each share
72 # This logic is going to be very nearly the same as the
73 # downloader. I bet we could pass the downloader a flag that
74 # makes it do this, and piggyback onto that instead of
75 # duplicating a bunch of code.
78 # r = Retrieve(blah, blah, blah, verify=True)
80 # (wait, wait, wait, d.callback)
82 # Then, when it has finished, we can check the servermap (which
83 # we provided to Retrieve) to figure out which shares are bad,
84 # since the Retrieve process will have updated the servermap as
87 # By passing the verify=True flag to the constructor, we are
88 # telling the downloader a few things.
90 # 1. It needs to download all N shares, not just K shares.
91 # 2. It doesn't need to decrypt or decode the shares, only
93 if not self.best_version:
96 r = Retrieve(self._node, self._storage_broker, servermap,
97 self.best_version, verify=True)
99 d.addCallback(self._process_bad_shares)
103 def _process_bad_shares(self, bad_shares):
105 self.need_repair = True
106 self.bad_shares = bad_shares
109 def _count_shares(self, smap, version):
110 available_shares = smap.shares_available()
111 (num_distinct_shares, k, N) = available_shares[version]
113 counters["count-shares-good"] = num_distinct_shares
114 counters["count-shares-needed"] = k
115 counters["count-shares-expected"] = N
116 good_hosts = smap.all_servers_for_version(version)
117 counters["count-good-share-hosts"] = len(good_hosts)
118 vmap = smap.make_versionmap()
119 counters["count-wrong-shares"] = sum([len(shares)
120 for verinfo,shares in vmap.items()
121 if verinfo != version])
125 def _fill_checker_results(self, smap, r):
126 self._monitor.raise_if_cancelled()
127 r.set_servermap(smap.copy())
132 vmap = smap.make_versionmap()
133 recoverable = smap.recoverable_versions()
134 unrecoverable = smap.unrecoverable_versions()
135 data["count-recoverable-versions"] = len(recoverable)
136 data["count-unrecoverable-versions"] = len(unrecoverable)
139 report.append("Recoverable Versions: " +
140 "/".join(["%d*%s" % (len(vmap[v]),
141 smap.summarize_version(v))
142 for v in recoverable]))
144 report.append("Unrecoverable Versions: " +
145 "/".join(["%d*%s" % (len(vmap[v]),
146 smap.summarize_version(v))
147 for v in unrecoverable]))
148 if smap.unrecoverable_versions():
150 summary.append("some versions are unrecoverable")
151 report.append("Unhealthy: some versions are unrecoverable")
152 if len(recoverable) == 0:
154 summary.append("no versions are recoverable")
155 report.append("Unhealthy: no versions are recoverable")
156 if len(recoverable) > 1:
158 summary.append("multiple versions are recoverable")
159 report.append("Unhealthy: there are multiple recoverable versions")
161 needs_rebalancing = False
163 best_version = smap.best_recoverable_version()
164 report.append("Best Recoverable Version: " +
165 smap.summarize_version(best_version))
166 counters = self._count_shares(smap, best_version)
167 data.update(counters)
168 s = counters["count-shares-good"]
169 k = counters["count-shares-needed"]
170 N = counters["count-shares-expected"]
173 report.append("Unhealthy: best version has only %d shares "
174 "(encoding is %d-of-%d)" % (s, k, N))
175 summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
176 hosts = smap.all_servers_for_version(best_version)
177 needs_rebalancing = bool( len(hosts) < N )
180 # find a k and N from somewhere
181 first = list(unrecoverable)[0]
182 # not exactly the best version, but that doesn't matter too much
183 data.update(self._count_shares(smap, first))
184 # leave needs_rebalancing=False: the file being unrecoverable is
187 # couldn't find anything at all
188 data["count-shares-good"] = 0
189 data["count-shares-needed"] = 3 # arbitrary defaults
190 data["count-shares-expected"] = 10
191 data["count-good-share-hosts"] = 0
192 data["count-wrong-shares"] = 0
195 data["count-corrupt-shares"] = len(self.bad_shares)
196 data["list-corrupt-shares"] = locators = []
197 report.append("Corrupt Shares:")
198 summary.append("Corrupt Shares:")
199 for (server, shnum, f) in sorted(self.bad_shares):
200 serverid = server.get_serverid()
201 locators.append( (serverid, self._storage_index, shnum) )
202 s = "%s-sh%d" % (server.get_name(), shnum)
203 if f.check(CorruptShareError):
207 report.append(" %s: %s" % (s, ft))
209 p = (serverid, self._storage_index, shnum, f)
211 msg = ("CorruptShareError during mutable verify, "
212 "serverid=%(serverid)s, si=%(si)s, shnum=%(shnum)d, "
214 log.msg(format=msg, serverid=server.get_name(),
215 si=base32.b2a(self._storage_index),
218 level=log.WEIRD, umid="EkK8QA")
220 data["count-corrupt-shares"] = 0
221 data["list-corrupt-shares"] = []
225 for (shnum, server, timestamp) in vmap[verinfo]:
226 shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
227 if shareid not in sharemap:
228 sharemap[shareid] = []
229 sharemap[shareid].append(server.get_serverid())
230 data["sharemap"] = sharemap
231 data["servers-responding"] = [s.get_serverid() for s in
232 list(smap.get_reachable_servers())]
234 r.set_healthy(healthy)
235 r.set_recoverable(bool(recoverable))
236 r.set_needs_rebalancing(needs_rebalancing)
239 r.set_summary("Healthy")
241 r.set_summary("Unhealthy: " + " ".join(summary))
245 class MutableCheckAndRepairer(MutableChecker):
246 SERVERMAP_MODE = MODE_WRITE # needed to get the privkey
248 def __init__(self, node, storage_broker, history, monitor):
249 MutableChecker.__init__(self, node, storage_broker, history, monitor)
250 self.cr_results = CheckAndRepairResults(self._storage_index)
251 self.cr_results.pre_repair_results = self.results
252 self.need_repair = False
254 def check(self, verify=False, add_lease=False):
255 d = MutableChecker.check(self, verify, add_lease)
256 d.addCallback(self._maybe_repair)
257 d.addCallback(lambda res: self.cr_results)
260 def _maybe_repair(self, res):
261 self._monitor.raise_if_cancelled()
262 if not self.need_repair:
263 self.cr_results.post_repair_results = self.results
265 if self._node.is_readonly():
266 # ticket #625: we cannot yet repair read-only mutable files
267 self.cr_results.post_repair_results = self.results
268 self.cr_results.repair_attempted = False
270 self.cr_results.repair_attempted = True
271 d = self._node.repair(self.results, monitor=self._monitor)
272 def _repair_finished(repair_results):
273 self.cr_results.repair_successful = repair_results.get_successful()
274 r = CheckResults(from_string(self._node.get_uri()), self._storage_index)
275 self.cr_results.post_repair_results = r
276 self._fill_checker_results(repair_results.servermap, r)
277 self.cr_results.repair_results = repair_results # TODO?
278 def _repair_error(f):
279 # I'm not sure if I want to pass through a failure or not.
280 self.cr_results.repair_successful = False
281 self.cr_results.repair_failure = f # TODO?
282 #self.cr_results.post_repair_results = ??
284 d.addCallbacks(_repair_finished, _repair_error)