]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/checker.py
Verifier: check the full cryptext-hash tree on each share. Removed .todos
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / checker.py
1 from foolscap.api import DeadReferenceError, RemoteException
2 from twisted.internet import defer
3 from allmydata.hashtree import IncompleteHashTree
4 from allmydata.check_results import CheckResults
5 from allmydata.immutable import download
6 from allmydata.uri import CHKFileVerifierURI
7 from allmydata.util.assertutil import precondition
8 from allmydata.util import base32, deferredutil, dictutil, log
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10      file_cancel_secret_hash, bucket_renewal_secret_hash, \
11      bucket_cancel_secret_hash
12
13 from allmydata.immutable import layout
14
15 class Checker(log.PrefixingLogMixin):
16     """I query all servers to see if M uniquely-numbered shares are
17     available.
18
19     If the verify flag was passed to my constructor, then for each share I
20     download every data block and all metadata from each server and perform a
21     cryptographic integrity check on all of it. If not, I just ask each
22     server 'Which shares do you have?' and believe its answer.
23
24     In either case, I wait until I have gotten responses from all servers.
25     This fact -- that I wait -- means that an ill-behaved server which fails
26     to answer my questions will make me wait indefinitely. If it is
27     ill-behaved in a way that triggers the underlying foolscap timeouts, then
28     I will wait only as long as those foolscap timeouts, but if it is
29     ill-behaved in a way which placates the foolscap timeouts but still
30     doesn't answer my question then I will wait indefinitely.
31
32     Before I send any new request to a server, I always ask the 'monitor'
33     object that was passed into my constructor whether this task has been
34     cancelled (by invoking its raise_if_cancelled() method).
35     """
36     def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
37                  monitor):
38         assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
39         assert precondition(isinstance(servers, (set, frozenset)), servers)
40         for (serverid, serverrref) in servers:
41             assert precondition(isinstance(serverid, str))
42
43         prefix = "%s" % base32.b2a_l(verifycap.storage_index[:8], 60)
44         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
45
46         self._verifycap = verifycap
47
48         self._monitor = monitor
49         self._servers = servers
50         self._verify = verify # bool: verify what the servers claim, or not?
51         self._add_lease = add_lease
52
53         frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
54                                        self._verifycap.storage_index)
55         self.file_renewal_secret = frs
56         fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
57                                       self._verifycap.storage_index)
58         self.file_cancel_secret = fcs
59
60     def _get_renewal_secret(self, peerid):
61         return bucket_renewal_secret_hash(self.file_renewal_secret, peerid)
62     def _get_cancel_secret(self, peerid):
63         return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
64
65     def _get_buckets(self, server, storageindex, serverid):
66         """Return a deferred that eventually fires with ({sharenum: bucket},
67         serverid, success). In case the server is disconnected or returns a
68         Failure then it fires with ({}, serverid, False) (A server
69         disconnecting or returning a Failure when we ask it for buckets is
70         the same, for our purposes, as a server that says it has none, except
71         that we want to track and report whether or not each server
72         responded.)"""
73
74         d = server.callRemote("get_buckets", storageindex)
75         if self._add_lease:
76             renew_secret = self._get_renewal_secret(serverid)
77             cancel_secret = self._get_cancel_secret(serverid)
78             d2 = server.callRemote("add_lease", storageindex,
79                                    renew_secret, cancel_secret)
80             dl = defer.DeferredList([d, d2], consumeErrors=True)
81             def _done(res):
82                 [(get_success, get_result),
83                  (addlease_success, addlease_result)] = res
84                 # ignore remote IndexError on the add_lease call. Propagate
85                 # local errors and remote non-IndexErrors
86                 if addlease_success:
87                     return get_result
88                 if not addlease_result.check(RemoteException):
89                     # Propagate local errors
90                     return addlease_result
91                 if addlease_result.value.failure.check(IndexError):
92                     # tahoe=1.3.0 raised IndexError on non-existant
93                     # buckets, which we ignore
94                     return get_result
95                 # propagate remote errors that aren't IndexError, including
96                 # the unfortunate internal KeyError bug that <1.3.0 had.
97                 return addlease_result
98             dl.addCallback(_done)
99             d = dl
100
101         def _wrap_results(res):
102             return (res, serverid, True)
103
104         def _trap_errs(f):
105             level = log.WEIRD
106             if f.check(DeadReferenceError):
107                 level = log.UNUSUAL
108             self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ")
109             return ({}, serverid, False)
110
111         d.addCallbacks(_wrap_results, _trap_errs)
112         return d
113
114     def _download_and_verify(self, serverid, sharenum, bucket):
115         """Start an attempt to download and verify every block in this bucket
116         and return a deferred that will eventually fire once the attempt
117         completes.
118
119         If you download and verify every block then fire with (True,
120         sharenum, None), else if the share data couldn't be parsed because it
121         was of an unknown version number fire with (False, sharenum,
122         'incompatible'), else if any of the blocks were invalid, fire with
123         (False, sharenum, 'corrupt'), else if the server disconnected (False,
124         sharenum, 'disconnect'), else if the server returned a Failure during
125         the process fire with (False, sharenum, 'failure').
126
127         If there is an internal error such as an uncaught exception in this
128         code, then the deferred will errback, but if there is a remote error
129         such as the server failing or the returned data being incorrect then
130         it will not errback -- it will fire normally with the indicated
131         results."""
132
133         vcap = self._verifycap
134         b = layout.ReadBucketProxy(bucket, serverid, vcap.storage_index)
135         veup = download.ValidatedExtendedURIProxy(b, vcap)
136         d = veup.start()
137
138         def _got_ueb(vup):
139             share_hash_tree = IncompleteHashTree(vcap.total_shares)
140             share_hash_tree.set_hashes({0: vup.share_root_hash})
141
142             vrbp = download.ValidatedReadBucketProxy(sharenum, b,
143                                                      share_hash_tree,
144                                                      vup.num_segments,
145                                                      vup.block_size,
146                                                      vup.share_size)
147
148             # note: normal download doesn't use get_all_sharehashes(),
149             # because it gets more data than necessary. We've discussed the
150             # security properties of having verification and download look
151             # identical (so the server couldn't, say, provide good responses
152             # for one and not the other), but I think that full verification
153             # is more important than defending against inconsistent server
154             # behavior. Besides, they can't pass the verifier without storing
155             # all the data, so there's not so much to be gained by behaving
156             # inconsistently.
157             d = vrbp.get_all_sharehashes()
158             # we fill share_hash_tree before fetching any blocks, so the
159             # block fetches won't send redundant share-hash-tree requests, to
160             # speed things up. Then we fetch+validate all the blockhashes.
161             d.addCallback(lambda ign: vrbp.get_all_blockhashes())
162
163             cht = IncompleteHashTree(vup.num_segments)
164             cht.set_hashes({0: vup.crypttext_root_hash})
165             d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
166
167             d.addCallback(lambda ign: vrbp)
168             return d
169         d.addCallback(_got_ueb)
170
171         def _discard_result(r):
172             assert isinstance(r, str), r
173             # to free up the RAM
174             return None
175         def _get_blocks(vrbp):
176             ds = []
177             for blocknum in range(veup.num_segments):
178                 db = vrbp.get_block(blocknum)
179                 db.addCallback(_discard_result)
180                 ds.append(db)
181             # this gatherResults will fire once every block of this share has
182             # been downloaded and verified, or else it will errback.
183             return deferredutil.gatherResults(ds)
184         d.addCallback(_get_blocks)
185
186         # if none of those errbacked, the blocks (and the hashes above them)
187         # are good
188         def _all_good(ign):
189             return (True, sharenum, None)
190         d.addCallback(_all_good)
191
192         # but if anything fails, we'll land here
193         def _errb(f):
194             # We didn't succeed at fetching and verifying all the blocks of
195             # this share. Handle each reason for failure differently.
196
197             if f.check(DeadReferenceError):
198                 return (False, sharenum, 'disconnect')
199             elif f.check(RemoteException):
200                 return (False, sharenum, 'failure')
201             elif f.check(layout.ShareVersionIncompatible):
202                 return (False, sharenum, 'incompatible')
203             elif f.check(layout.LayoutInvalid,
204                          layout.RidiculouslyLargeURIExtensionBlock,
205                          download.BadOrMissingHash,
206                          download.BadURIExtensionHashValue):
207                 return (False, sharenum, 'corrupt')
208
209             # if it wasn't one of those reasons, re-raise the error
210             return f
211         d.addErrback(_errb)
212
213         return d
214
215     def _verify_server_shares(self, serverid, ss):
216         """ Return a deferred which eventually fires with a tuple of
217         (set(sharenum), serverid, set(corruptsharenum),
218         set(incompatiblesharenum), success) showing all the shares verified
219         to be served by this server, and all the corrupt shares served by the
220         server, and all the incompatible shares served by the server. In case
221         the server is disconnected or returns a Failure then it fires with
222         the last element False.
223
224         A server disconnecting or returning a failure when we ask it for
225         shares is the same, for our purposes, as a server that says it has
226         none or offers invalid ones, except that we want to track and report
227         the server's behavior. Similarly, the presence of corrupt shares is
228         mainly of use for diagnostics -- you can typically treat it as just
229         like being no share at all by just observing its absence from the
230         verified shares dict and ignoring its presence in the corrupt shares
231         dict.
232
233         The 'success' argument means whether the server responded to *any*
234         queries during this process, so if it responded to some queries and
235         then disconnected and ceased responding, or returned a failure, it is
236         still marked with the True flag for 'success'.
237         """
238         d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
239
240         def _got_buckets(result):
241             bucketdict, serverid, success = result
242
243             shareverds = []
244             for (sharenum, bucket) in bucketdict.items():
245                 d = self._download_and_verify(serverid, sharenum, bucket)
246                 shareverds.append(d)
247
248             dl = deferredutil.gatherResults(shareverds)
249
250             def collect(results):
251                 verified = set()
252                 corrupt = set()
253                 incompatible = set()
254                 for succ, sharenum, whynot in results:
255                     if succ:
256                         verified.add(sharenum)
257                     else:
258                         if whynot == 'corrupt':
259                             corrupt.add(sharenum)
260                         elif whynot == 'incompatible':
261                             incompatible.add(sharenum)
262                 return (verified, serverid, corrupt, incompatible, success)
263
264             dl.addCallback(collect)
265             return dl
266
267         def _err(f):
268             f.trap(RemoteException, DeadReferenceError)
269             return (set(), serverid, set(), set(), False)
270
271         d.addCallbacks(_got_buckets, _err)
272         return d
273
274     def _check_server_shares(self, serverid, ss):
275         """Return a deferred which eventually fires with a tuple of
276         (set(sharenum), serverid, set(), set(), responded) showing all the
277         shares claimed to be served by this server. In case the server is
278         disconnected then it fires with (set() serverid, set(), set(), False)
279         (a server disconnecting when we ask it for buckets is the same, for
280         our purposes, as a server that says it has none, except that we want
281         to track and report whether or not each server responded.)"""
282         def _curry_empty_corrupted(res):
283             buckets, serverid, responded = res
284             return (set(buckets), serverid, set(), set(), responded)
285         d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
286         d.addCallback(_curry_empty_corrupted)
287         return d
288
289     def _format_results(self, results):
290         cr = CheckResults(self._verifycap, self._verifycap.storage_index)
291         d = {}
292         d['count-shares-needed'] = self._verifycap.needed_shares
293         d['count-shares-expected'] = self._verifycap.total_shares
294
295         verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
296         servers = {} # {serverid: set(sharenums)}
297         corruptsharelocators = [] # (serverid, storageindex, sharenum)
298         incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
299
300         for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
301             servers.setdefault(thisserverid, set()).update(theseverifiedshares)
302             for sharenum in theseverifiedshares:
303                 verifiedshares.setdefault(sharenum, set()).add(thisserverid)
304             for sharenum in thesecorruptshares:
305                 corruptsharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
306             for sharenum in theseincompatibleshares:
307                 incompatiblesharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
308
309         d['count-shares-good'] = len(verifiedshares)
310         d['count-good-share-hosts'] = len([s for s in servers.keys() if servers[s]])
311
312         assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
313         if len(verifiedshares) == self._verifycap.total_shares:
314             cr.set_healthy(True)
315             cr.set_summary("Healthy")
316         else:
317             cr.set_healthy(False)
318             cr.set_summary("Not Healthy: %d shares (enc %d-of-%d)" %
319                            (len(verifiedshares),
320                             self._verifycap.needed_shares,
321                             self._verifycap.total_shares))
322         if len(verifiedshares) >= self._verifycap.needed_shares:
323             cr.set_recoverable(True)
324             d['count-recoverable-versions'] = 1
325             d['count-unrecoverable-versions'] = 0
326         else:
327             cr.set_recoverable(False)
328             d['count-recoverable-versions'] = 0
329             d['count-unrecoverable-versions'] = 1
330
331         d['servers-responding'] = list(servers)
332         d['sharemap'] = verifiedshares
333         # no such thing as wrong shares of an immutable file
334         d['count-wrong-shares'] = 0
335         d['list-corrupt-shares'] = corruptsharelocators
336         d['count-corrupt-shares'] = len(corruptsharelocators)
337         d['list-incompatible-shares'] = incompatiblesharelocators
338         d['count-incompatible-shares'] = len(incompatiblesharelocators)
339
340
341         # The file needs rebalancing if the set of servers that have at least
342         # one share is less than the number of uniquely-numbered shares
343         # available.
344         cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
345
346         cr.set_data(d)
347
348         return cr
349
350     def start(self):
351         ds = []
352         if self._verify:
353             for (serverid, ss) in self._servers:
354                 ds.append(self._verify_server_shares(serverid, ss))
355         else:
356             for (serverid, ss) in self._servers:
357                 ds.append(self._check_server_shares(serverid, ss))
358
359         return deferredutil.gatherResults(ds).addCallback(self._format_results)