2 from allmydata.uri import from_string
3 from allmydata.util import base32, log
4 from allmydata.check_results import CheckAndRepairResults, CheckResults
6 from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError
7 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
8 from allmydata.mutable.retrieve import Retrieve # for verifying
11 SERVERMAP_MODE = MODE_CHECK
13 def __init__(self, node, storage_broker, history, monitor):
15 self._storage_broker = storage_broker
16 self._history = history
17 self._monitor = monitor
18 self.bad_shares = [] # list of (server,shnum,failure)
19 self._storage_index = self._node.get_storage_index()
20 self.need_repair = False
21 self.responded = set() # set of (binary) nodeids
23 def check(self, verify=False, add_lease=False):
24 servermap = ServerMap()
25 # Updating the servermap in MODE_CHECK will stand a good chance
26 # of finding all of the shares, and getting a good idea of
27 # recoverability, etc, without verifying.
28 u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
29 servermap, self.SERVERMAP_MODE,
32 self._history.notify_mapupdate(u.get_status())
34 d.addCallback(self._got_mapupdate_results)
36 d.addCallback(self._verify_all_shares)
37 d.addCallback(lambda res: servermap)
38 d.addCallback(self._make_checker_results)
41 def _got_mapupdate_results(self, servermap):
42 # the file is healthy if there is exactly one recoverable version, it
43 # has at least N distinct shares, and there are no unrecoverable
44 # versions: all existing shares will be for the same version.
45 self._monitor.raise_if_cancelled()
46 self.best_version = None
47 num_recoverable = len(servermap.recoverable_versions())
49 self.best_version = servermap.best_recoverable_version()
51 # The file is unhealthy and needs to be repaired if:
52 # - There are unrecoverable versions.
53 if servermap.unrecoverable_versions():
54 self.need_repair = True
55 # - There isn't a recoverable version.
56 if num_recoverable != 1:
57 self.need_repair = True
58 # - The best recoverable version is missing some shares.
60 available_shares = servermap.shares_available()
61 (num_distinct_shares, k, N) = available_shares[self.best_version]
62 if num_distinct_shares < N:
63 self.need_repair = True
67 def _verify_all_shares(self, servermap):
68 # read every byte of each share
70 # This logic is going to be very nearly the same as the
71 # downloader. I bet we could pass the downloader a flag that
72 # makes it do this, and piggyback onto that instead of
73 # duplicating a bunch of code.
76 # r = Retrieve(blah, blah, blah, verify=True)
78 # (wait, wait, wait, d.callback)
80 # Then, when it has finished, we can check the servermap (which
81 # we provided to Retrieve) to figure out which shares are bad,
82 # since the Retrieve process will have updated the servermap as
85 # By passing the verify=True flag to the constructor, we are
86 # telling the downloader a few things.
88 # 1. It needs to download all N shares, not just K shares.
89 # 2. It doesn't need to decrypt or decode the shares, only
91 if not self.best_version:
94 r = Retrieve(self._node, self._storage_broker, servermap,
95 self.best_version, verify=True)
97 d.addCallback(self._process_bad_shares)
101 def _process_bad_shares(self, bad_shares):
103 self.need_repair = True
104 self.bad_shares = bad_shares
107 def _count_shares(self, smap, version):
108 available_shares = smap.shares_available()
109 (num_distinct_shares, k, N) = available_shares[version]
111 counters["count-shares-good"] = num_distinct_shares
112 counters["count-shares-needed"] = k
113 counters["count-shares-expected"] = N
114 good_hosts = smap.all_servers_for_version(version)
115 counters["count-good-share-hosts"] = len(good_hosts)
116 vmap = smap.make_versionmap()
117 counters["count-wrong-shares"] = sum([len(shares)
118 for verinfo,shares in vmap.items()
119 if verinfo != version])
123 def _make_checker_results(self, smap):
124 r = CheckResults(from_string(self._node.get_uri()),
126 self._monitor.raise_if_cancelled()
127 r.set_servermap(smap.copy())
131 vmap = smap.make_versionmap()
132 recoverable = smap.recoverable_versions()
133 unrecoverable = smap.unrecoverable_versions()
136 report.append("Recoverable Versions: " +
137 "/".join(["%d*%s" % (len(vmap[v]),
138 smap.summarize_version(v))
139 for v in recoverable]))
141 report.append("Unrecoverable Versions: " +
142 "/".join(["%d*%s" % (len(vmap[v]),
143 smap.summarize_version(v))
144 for v in unrecoverable]))
145 if smap.unrecoverable_versions():
147 summary.append("some versions are unrecoverable")
148 report.append("Unhealthy: some versions are unrecoverable")
149 if len(recoverable) == 0:
151 summary.append("no versions are recoverable")
152 report.append("Unhealthy: no versions are recoverable")
153 if len(recoverable) > 1:
155 summary.append("multiple versions are recoverable")
156 report.append("Unhealthy: there are multiple recoverable versions")
158 needs_rebalancing = False
160 best_version = smap.best_recoverable_version()
161 report.append("Best Recoverable Version: " +
162 smap.summarize_version(best_version))
163 counters = self._count_shares(smap, best_version)
164 s = counters["count-shares-good"]
165 k = counters["count-shares-needed"]
166 N = counters["count-shares-expected"]
169 report.append("Unhealthy: best version has only %d shares "
170 "(encoding is %d-of-%d)" % (s, k, N))
171 summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
172 hosts = smap.all_servers_for_version(best_version)
173 needs_rebalancing = bool( len(hosts) < N )
176 # find a k and N from somewhere
177 first = list(unrecoverable)[0]
178 # not exactly the best version, but that doesn't matter too much
179 counters = self._count_shares(smap, first)
180 # leave needs_rebalancing=False: the file being unrecoverable is
183 # couldn't find anything at all
185 "count-shares-good": 0,
186 "count-shares-needed": 3, # arbitrary defaults
187 "count-shares-expected": 10,
188 "count-good-share-hosts": 0,
189 "count-wrong-shares": 0,
192 corrupt_share_locators = []
194 report.append("Corrupt Shares:")
195 summary.append("Corrupt Shares:")
196 for (server, shnum, f) in sorted(self.bad_shares):
197 serverid = server.get_serverid()
198 locator = (serverid, self._storage_index, shnum)
199 corrupt_share_locators.append(locator)
200 s = "%s-sh%d" % (server.get_name(), shnum)
201 if f.check(CorruptShareError):
205 report.append(" %s: %s" % (s, ft))
207 p = (serverid, self._storage_index, shnum, f)
209 msg = ("CorruptShareError during mutable verify, "
210 "serverid=%(serverid)s, si=%(si)s, shnum=%(shnum)d, "
212 log.msg(format=msg, serverid=server.get_name(),
213 si=base32.b2a(self._storage_index),
216 level=log.WEIRD, umid="EkK8QA")
220 for (shnum, server, timestamp) in vmap[verinfo]:
221 shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
222 if shareid not in sharemap:
223 sharemap[shareid] = []
224 sharemap[shareid].append(server.get_serverid())
225 servers_responding = [s.get_serverid() for s in
226 list(smap.get_reachable_servers())]
228 count_shares_needed=counters["count-shares-needed"],
229 count_shares_expected=counters["count-shares-expected"],
230 count_shares_good=counters["count-shares-good"],
231 count_good_share_hosts=counters["count-good-share-hosts"],
232 count_recoverable_versions=len(recoverable),
233 count_unrecoverable_versions=len(unrecoverable),
234 servers_responding=servers_responding,
236 count_wrong_shares=counters["count-wrong-shares"],
237 list_corrupt_shares=corrupt_share_locators,
238 count_corrupt_shares=len(corrupt_share_locators),
239 list_incompatible_shares=[],
240 count_incompatible_shares=0,
243 r.set_healthy(healthy)
244 r.set_recoverable(bool(recoverable))
245 r.set_needs_rebalancing(needs_rebalancing)
247 r.set_summary("Healthy")
249 r.set_summary("Unhealthy: " + " ".join(summary))
254 class MutableCheckAndRepairer(MutableChecker):
255 SERVERMAP_MODE = MODE_WRITE # needed to get the privkey
257 def __init__(self, node, storage_broker, history, monitor):
258 MutableChecker.__init__(self, node, storage_broker, history, monitor)
259 self.cr_results = CheckAndRepairResults(self._storage_index)
260 self.need_repair = False
262 def check(self, verify=False, add_lease=False):
263 d = MutableChecker.check(self, verify, add_lease)
264 d.addCallback(self._stash_pre_repair_results)
265 d.addCallback(self._maybe_repair)
266 d.addCallback(lambda res: self.cr_results)
269 def _stash_pre_repair_results(self, pre_repair_results):
270 self.cr_results.pre_repair_results = pre_repair_results
271 return pre_repair_results
273 def _maybe_repair(self, pre_repair_results):
274 crr = self.cr_results
275 self._monitor.raise_if_cancelled()
276 if not self.need_repair:
277 crr.post_repair_results = pre_repair_results
279 if self._node.is_readonly():
280 # ticket #625: we cannot yet repair read-only mutable files
281 crr.post_repair_results = pre_repair_results
282 crr.repair_attempted = False
284 crr.repair_attempted = True
285 d = self._node.repair(pre_repair_results, monitor=self._monitor)
286 def _repair_finished(rr):
287 crr.repair_successful = rr.get_successful()
288 crr.post_repair_results = self._make_checker_results(rr.servermap)
289 crr.repair_results = rr # TODO?
291 def _repair_error(f):
292 # I'm not sure if I want to pass through a failure or not.
293 crr.repair_successful = False
294 crr.repair_failure = f # TODO?
295 #crr.post_repair_results = ??
297 d.addCallbacks(_repair_finished, _repair_error)