1 from foolscap.api import DeadReferenceError, RemoteException
2 from twisted.internet import defer
3 from allmydata import hashtree
4 from allmydata.check_results import CheckResults
5 from allmydata.immutable import download
6 from allmydata.uri import CHKFileVerifierURI
7 from allmydata.util.assertutil import precondition
8 from allmydata.util import base32, deferredutil, dictutil, log
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10 file_cancel_secret_hash, bucket_renewal_secret_hash, \
11 bucket_cancel_secret_hash
13 from allmydata.immutable import layout
15 class Checker(log.PrefixingLogMixin):
16 """I query all servers to see if M uniquely-numbered shares are
19 If the verify flag was passed to my constructor, then for each share I
20 download every data block and all metadata from each server and perform a
21 cryptographic integrity check on all of it. If not, I just ask each
22 server 'Which shares do you have?' and believe its answer.
24 In either case, I wait until I have gotten responses from all servers.
25 This fact -- that I wait -- means that an ill-behaved server which fails
26 to answer my questions will make me wait indefinitely. If it is
27 ill-behaved in a way that triggers the underlying foolscap timeouts, then
28 I will wait only as long as those foolscap timeouts, but if it is
29 ill-behaved in a way which placates the foolscap timeouts but still
30 doesn't answer my question then I will wait indefinitely.
32 Before I send any new request to a server, I always ask the 'monitor'
33 object that was passed into my constructor whether this task has been
34 cancelled (by invoking its raise_if_cancelled() method).
36 def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
38 assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
39 assert precondition(isinstance(servers, (set, frozenset)), servers)
40 for (serverid, serverrref) in servers:
41 assert precondition(isinstance(serverid, str))
43 prefix = "%s" % base32.b2a_l(verifycap.storage_index[:8], 60)
44 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
46 self._verifycap = verifycap
48 self._monitor = monitor
49 self._servers = servers
50 self._verify = verify # bool: verify what the servers claim, or not?
51 self._add_lease = add_lease
53 self._share_hash_tree = None
55 frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
56 self._verifycap.storage_index)
57 self.file_renewal_secret = frs
58 fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
59 self._verifycap.storage_index)
60 self.file_cancel_secret = fcs
62 def _get_renewal_secret(self, peerid):
63 return bucket_renewal_secret_hash(self.file_renewal_secret, peerid)
64 def _get_cancel_secret(self, peerid):
65 return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
67 def _get_buckets(self, server, storageindex, serverid):
68 """Return a deferred that eventually fires with ({sharenum: bucket},
69 serverid, success). In case the server is disconnected or returns a
70 Failure then it fires with ({}, serverid, False) (A server
71 disconnecting or returning a Failure when we ask it for buckets is
72 the same, for our purposes, as a server that says it has none, except
73 that we want to track and report whether or not each server
76 d = server.callRemote("get_buckets", storageindex)
78 renew_secret = self._get_renewal_secret(serverid)
79 cancel_secret = self._get_cancel_secret(serverid)
80 d2 = server.callRemote("add_lease", storageindex,
81 renew_secret, cancel_secret)
82 dl = defer.DeferredList([d, d2], consumeErrors=True)
84 [(get_success, get_result),
85 (addlease_success, addlease_result)] = res
86 # ignore remote IndexError on the add_lease call. Propagate
87 # local errors and remote non-IndexErrors
90 if not addlease_result.check(RemoteException):
91 # Propagate local errors
92 return addlease_result
93 if addlease_result.value.failure.check(IndexError):
94 # tahoe=1.3.0 raised IndexError on non-existant
95 # buckets, which we ignore
97 # propagate remote errors that aren't IndexError, including
98 # the unfortunate internal KeyError bug that <1.3.0 had.
99 return addlease_result
100 dl.addCallback(_done)
103 def _wrap_results(res):
104 return (res, serverid, True)
108 if f.check(DeadReferenceError):
110 self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ")
111 return ({}, serverid, False)
113 d.addCallbacks(_wrap_results, _trap_errs)
116 def _download_and_verify(self, serverid, sharenum, bucket):
117 """Start an attempt to download and verify every block in this bucket
118 and return a deferred that will eventually fire once the attempt
121 If you download and verify every block then fire with (True,
122 sharenum, None), else if the share data couldn't be parsed because it
123 was of an unknown version number fire with (False, sharenum,
124 'incompatible'), else if any of the blocks were invalid, fire with
125 (False, sharenum, 'corrupt'), else if the server disconnected (False,
126 sharenum, 'disconnect'), else if the server returned a Failure during
127 the process fire with (False, sharenum, 'failure').
129 If there is an internal error such as an uncaught exception in this
130 code, then the deferred will errback, but if there is a remote error
131 such as the server failing or the returned data being incorrect then
132 it will not errback -- it will fire normally with the indicated
135 b = layout.ReadBucketProxy(bucket, serverid, self._verifycap.storage_index)
136 veup = download.ValidatedExtendedURIProxy(b, self._verifycap)
140 # We didn't succeed at fetching and verifying all the blocks of
141 # this share. Handle each reason for failure differently.
143 if f.check(DeadReferenceError):
144 return (False, sharenum, 'disconnect')
145 elif f.check(RemoteException):
146 return (False, sharenum, 'failure')
147 elif f.check(layout.ShareVersionIncompatible):
148 return (False, sharenum, 'incompatible')
149 elif f.check(layout.LayoutInvalid,
150 layout.RidiculouslyLargeURIExtensionBlock,
151 download.BadOrMissingHash,
152 download.BadURIExtensionHashValue):
153 return (False, sharenum, 'corrupt')
155 # if it wasn't one of those reasons, re-raise the error
159 self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
160 self._share_hash_tree.set_hashes({0: vup.share_root_hash})
162 vrbp = download.ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, vup.num_segments, vup.block_size, vup.share_size)
165 for blocknum in range(vup.num_segments):
166 def _discard_result(r):
167 assert isinstance(r, str), r
170 d2 = vrbp.get_block(blocknum)
171 d2.addCallback(_discard_result)
174 dl = deferredutil.gatherResults(ds)
175 # dl will fire once every block of this share has been downloaded
176 # and verified, or else it will errback.
179 return (True, sharenum, None)
184 d.addCallback(_got_ueb)
189 def _verify_server_shares(self, serverid, ss):
190 """ Return a deferred which eventually fires with a tuple of
191 (set(sharenum), serverid, set(corruptsharenum),
192 set(incompatiblesharenum), success) showing all the shares verified
193 to be served by this server, and all the corrupt shares served by the
194 server, and all the incompatible shares served by the server. In case
195 the server is disconnected or returns a Failure then it fires with
196 the last element False.
198 A server disconnecting or returning a failure when we ask it for
199 shares is the same, for our purposes, as a server that says it has
200 none or offers invalid ones, except that we want to track and report
201 the server's behavior. Similarly, the presence of corrupt shares is
202 mainly of use for diagnostics -- you can typically treat it as just
203 like being no share at all by just observing its absence from the
204 verified shares dict and ignoring its presence in the corrupt shares
207 The 'success' argument means whether the server responded to *any*
208 queries during this process, so if it responded to some queries and
209 then disconnected and ceased responding, or returned a failure, it is
210 still marked with the True flag for 'success'.
212 d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
214 def _got_buckets(result):
215 bucketdict, serverid, success = result
218 for (sharenum, bucket) in bucketdict.items():
219 d = self._download_and_verify(serverid, sharenum, bucket)
222 dl = deferredutil.gatherResults(shareverds)
224 def collect(results):
228 for succ, sharenum, whynot in results:
230 verified.add(sharenum)
232 if whynot == 'corrupt':
233 corrupt.add(sharenum)
234 elif whynot == 'incompatible':
235 incompatible.add(sharenum)
236 return (verified, serverid, corrupt, incompatible, success)
238 dl.addCallback(collect)
242 f.trap(RemoteException, DeadReferenceError)
243 return (set(), serverid, set(), set(), False)
245 d.addCallbacks(_got_buckets, _err)
248 def _check_server_shares(self, serverid, ss):
249 """Return a deferred which eventually fires with a tuple of
250 (set(sharenum), serverid, set(), set(), responded) showing all the
251 shares claimed to be served by this server. In case the server is
252 disconnected then it fires with (set() serverid, set(), set(), False)
253 (a server disconnecting when we ask it for buckets is the same, for
254 our purposes, as a server that says it has none, except that we want
255 to track and report whether or not each server responded.)"""
256 def _curry_empty_corrupted(res):
257 buckets, serverid, responded = res
258 return (set(buckets), serverid, set(), set(), responded)
259 d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
260 d.addCallback(_curry_empty_corrupted)
263 def _format_results(self, results):
264 cr = CheckResults(self._verifycap, self._verifycap.storage_index)
266 d['count-shares-needed'] = self._verifycap.needed_shares
267 d['count-shares-expected'] = self._verifycap.total_shares
269 verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
270 servers = {} # {serverid: set(sharenums)}
271 corruptsharelocators = [] # (serverid, storageindex, sharenum)
272 incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
274 for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
275 servers.setdefault(thisserverid, set()).update(theseverifiedshares)
276 for sharenum in theseverifiedshares:
277 verifiedshares.setdefault(sharenum, set()).add(thisserverid)
278 for sharenum in thesecorruptshares:
279 corruptsharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
280 for sharenum in theseincompatibleshares:
281 incompatiblesharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
283 d['count-shares-good'] = len(verifiedshares)
284 d['count-good-share-hosts'] = len([s for s in servers.keys() if servers[s]])
286 assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
287 if len(verifiedshares) == self._verifycap.total_shares:
289 cr.set_summary("Healthy")
291 cr.set_healthy(False)
292 cr.set_summary("Not Healthy: %d shares (enc %d-of-%d)" %
293 (len(verifiedshares),
294 self._verifycap.needed_shares,
295 self._verifycap.total_shares))
296 if len(verifiedshares) >= self._verifycap.needed_shares:
297 cr.set_recoverable(True)
298 d['count-recoverable-versions'] = 1
299 d['count-unrecoverable-versions'] = 0
301 cr.set_recoverable(False)
302 d['count-recoverable-versions'] = 0
303 d['count-unrecoverable-versions'] = 1
305 d['servers-responding'] = list(servers)
306 d['sharemap'] = verifiedshares
307 # no such thing as wrong shares of an immutable file
308 d['count-wrong-shares'] = 0
309 d['list-corrupt-shares'] = corruptsharelocators
310 d['count-corrupt-shares'] = len(corruptsharelocators)
311 d['list-incompatible-shares'] = incompatiblesharelocators
312 d['count-incompatible-shares'] = len(incompatiblesharelocators)
315 # The file needs rebalancing if the set of servers that have at least
316 # one share is less than the number of uniquely-numbered shares
318 cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
327 for (serverid, ss) in self._servers:
328 ds.append(self._verify_server_shares(serverid, ss))
330 for (serverid, ss) in self._servers:
331 ds.append(self._check_server_shares(serverid, ss))
333 return deferredutil.gatherResults(ds).addCallback(self._format_results)