]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/checker.py
cbb8637d1102c30923e787a3bd53d27a1f25ab57
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / checker.py
1 from foolscap.api import DeadReferenceError, RemoteException
2 from allmydata.hashtree import IncompleteHashTree
3 from allmydata.check_results import CheckResults
4 from allmydata.immutable import download
5 from allmydata.uri import CHKFileVerifierURI
6 from allmydata.util.assertutil import precondition
7 from allmydata.util import base32, idlib, deferredutil, dictutil, log
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash
11
12 from allmydata.immutable import layout
13
14 class Checker(log.PrefixingLogMixin):
15     """I query all servers to see if M uniquely-numbered shares are
16     available.
17
18     If the verify flag was passed to my constructor, then for each share I
19     download every data block and all metadata from each server and perform a
20     cryptographic integrity check on all of it. If not, I just ask each
21     server 'Which shares do you have?' and believe its answer.
22
23     In either case, I wait until I have gotten responses from all servers.
24     This fact -- that I wait -- means that an ill-behaved server which fails
25     to answer my questions will make me wait indefinitely. If it is
26     ill-behaved in a way that triggers the underlying foolscap timeouts, then
27     I will wait only as long as those foolscap timeouts, but if it is
28     ill-behaved in a way which placates the foolscap timeouts but still
29     doesn't answer my question then I will wait indefinitely.
30
31     Before I send any new request to a server, I always ask the 'monitor'
32     object that was passed into my constructor whether this task has been
33     cancelled (by invoking its raise_if_cancelled() method).
34     """
35     def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
36                  monitor):
37         assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
38         assert precondition(isinstance(servers, (set, frozenset)), servers)
39         for (serverid, serverrref) in servers:
40             assert precondition(isinstance(serverid, str))
41
42         prefix = "%s" % base32.b2a_l(verifycap.storage_index[:8], 60)
43         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
44
45         self._verifycap = verifycap
46
47         self._monitor = monitor
48         self._servers = servers
49         self._verify = verify # bool: verify what the servers claim, or not?
50         self._add_lease = add_lease
51
52         frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
53                                        self._verifycap.storage_index)
54         self.file_renewal_secret = frs
55         fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
56                                       self._verifycap.storage_index)
57         self.file_cancel_secret = fcs
58
59     def _get_renewal_secret(self, peerid):
60         return bucket_renewal_secret_hash(self.file_renewal_secret, peerid)
61     def _get_cancel_secret(self, peerid):
62         return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
63
64     def _get_buckets(self, server, storageindex, serverid):
65         """Return a deferred that eventually fires with ({sharenum: bucket},
66         serverid, success). In case the server is disconnected or returns a
67         Failure then it fires with ({}, serverid, False) (A server
68         disconnecting or returning a Failure when we ask it for buckets is
69         the same, for our purposes, as a server that says it has none, except
70         that we want to track and report whether or not each server
71         responded.)"""
72
73         if self._add_lease:
74             renew_secret = self._get_renewal_secret(serverid)
75             cancel_secret = self._get_cancel_secret(serverid)
76             d2 = server.callRemote("add_lease", storageindex,
77                                    renew_secret, cancel_secret)
78             d2.addErrback(self._add_lease_failed, serverid, storageindex)
79
80         d = server.callRemote("get_buckets", storageindex)
81         def _wrap_results(res):
82             return (res, serverid, True)
83
84         def _trap_errs(f):
85             level = log.WEIRD
86             if f.check(DeadReferenceError):
87                 level = log.UNUSUAL
88             self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ")
89             return ({}, serverid, False)
90
91         d.addCallbacks(_wrap_results, _trap_errs)
92         return d
93
94     def _add_lease_failed(self, f, peerid, storage_index):
95         # Older versions of Tahoe didn't handle the add-lease message very
96         # well: <=1.1.0 throws a NameError because it doesn't implement
97         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
98         # (which is most of them, since we send add-lease to everybody,
99         # before we know whether or not they have any shares for us), and
100         # 1.2.0 throws KeyError even on known buckets due to an internal bug
101         # in the latency-measuring code.
102
103         # we want to ignore the known-harmless errors and log the others. In
104         # particular we want to log any local errors caused by coding
105         # problems.
106
107         if f.check(DeadReferenceError):
108             return
109         if f.check(RemoteException):
110             if f.value.failure.check(KeyError, IndexError, NameError):
111                 # this may ignore a bit too much, but that only hurts us
112                 # during debugging
113                 return
114             self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
115                      peerid=idlib.shortnodeid_b2a(peerid),
116                      f_value=str(f.value),
117                      failure=f,
118                      level=log.WEIRD, umid="atbAxw")
119             return
120         # local errors are cause for alarm
121         log.err(format="local error in add_lease to [%(peerid)s]: %(f_value)s",
122                 peerid=idlib.shortnodeid_b2a(peerid),
123                 f_value=str(f.value),
124                 failure=f,
125                 level=log.WEIRD, umid="hEGuQg")
126
127
128     def _download_and_verify(self, serverid, sharenum, bucket):
129         """Start an attempt to download and verify every block in this bucket
130         and return a deferred that will eventually fire once the attempt
131         completes.
132
133         If you download and verify every block then fire with (True,
134         sharenum, None), else if the share data couldn't be parsed because it
135         was of an unknown version number fire with (False, sharenum,
136         'incompatible'), else if any of the blocks were invalid, fire with
137         (False, sharenum, 'corrupt'), else if the server disconnected (False,
138         sharenum, 'disconnect'), else if the server returned a Failure during
139         the process fire with (False, sharenum, 'failure').
140
141         If there is an internal error such as an uncaught exception in this
142         code, then the deferred will errback, but if there is a remote error
143         such as the server failing or the returned data being incorrect then
144         it will not errback -- it will fire normally with the indicated
145         results."""
146
147         vcap = self._verifycap
148         b = layout.ReadBucketProxy(bucket, serverid, vcap.storage_index)
149         veup = download.ValidatedExtendedURIProxy(b, vcap)
150         d = veup.start()
151
152         def _got_ueb(vup):
153             share_hash_tree = IncompleteHashTree(vcap.total_shares)
154             share_hash_tree.set_hashes({0: vup.share_root_hash})
155
156             vrbp = download.ValidatedReadBucketProxy(sharenum, b,
157                                                      share_hash_tree,
158                                                      vup.num_segments,
159                                                      vup.block_size,
160                                                      vup.share_size)
161
162             # note: normal download doesn't use get_all_sharehashes(),
163             # because it gets more data than necessary. We've discussed the
164             # security properties of having verification and download look
165             # identical (so the server couldn't, say, provide good responses
166             # for one and not the other), but I think that full verification
167             # is more important than defending against inconsistent server
168             # behavior. Besides, they can't pass the verifier without storing
169             # all the data, so there's not so much to be gained by behaving
170             # inconsistently.
171             d = vrbp.get_all_sharehashes()
172             # we fill share_hash_tree before fetching any blocks, so the
173             # block fetches won't send redundant share-hash-tree requests, to
174             # speed things up. Then we fetch+validate all the blockhashes.
175             d.addCallback(lambda ign: vrbp.get_all_blockhashes())
176
177             cht = IncompleteHashTree(vup.num_segments)
178             cht.set_hashes({0: vup.crypttext_root_hash})
179             d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
180
181             d.addCallback(lambda ign: vrbp)
182             return d
183         d.addCallback(_got_ueb)
184
185         def _discard_result(r):
186             assert isinstance(r, str), r
187             # to free up the RAM
188             return None
189         def _get_blocks(vrbp):
190             ds = []
191             for blocknum in range(veup.num_segments):
192                 db = vrbp.get_block(blocknum)
193                 db.addCallback(_discard_result)
194                 ds.append(db)
195             # this gatherResults will fire once every block of this share has
196             # been downloaded and verified, or else it will errback.
197             return deferredutil.gatherResults(ds)
198         d.addCallback(_get_blocks)
199
200         # if none of those errbacked, the blocks (and the hashes above them)
201         # are good
202         def _all_good(ign):
203             return (True, sharenum, None)
204         d.addCallback(_all_good)
205
206         # but if anything fails, we'll land here
207         def _errb(f):
208             # We didn't succeed at fetching and verifying all the blocks of
209             # this share. Handle each reason for failure differently.
210
211             if f.check(DeadReferenceError):
212                 return (False, sharenum, 'disconnect')
213             elif f.check(RemoteException):
214                 return (False, sharenum, 'failure')
215             elif f.check(layout.ShareVersionIncompatible):
216                 return (False, sharenum, 'incompatible')
217             elif f.check(layout.LayoutInvalid,
218                          layout.RidiculouslyLargeURIExtensionBlock,
219                          download.BadOrMissingHash,
220                          download.BadURIExtensionHashValue):
221                 return (False, sharenum, 'corrupt')
222
223             # if it wasn't one of those reasons, re-raise the error
224             return f
225         d.addErrback(_errb)
226
227         return d
228
229     def _verify_server_shares(self, serverid, ss):
230         """ Return a deferred which eventually fires with a tuple of
231         (set(sharenum), serverid, set(corruptsharenum),
232         set(incompatiblesharenum), success) showing all the shares verified
233         to be served by this server, and all the corrupt shares served by the
234         server, and all the incompatible shares served by the server. In case
235         the server is disconnected or returns a Failure then it fires with
236         the last element False.
237
238         A server disconnecting or returning a failure when we ask it for
239         shares is the same, for our purposes, as a server that says it has
240         none or offers invalid ones, except that we want to track and report
241         the server's behavior. Similarly, the presence of corrupt shares is
242         mainly of use for diagnostics -- you can typically treat it as just
243         like being no share at all by just observing its absence from the
244         verified shares dict and ignoring its presence in the corrupt shares
245         dict.
246
247         The 'success' argument means whether the server responded to *any*
248         queries during this process, so if it responded to some queries and
249         then disconnected and ceased responding, or returned a failure, it is
250         still marked with the True flag for 'success'.
251         """
252         d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
253
254         def _got_buckets(result):
255             bucketdict, serverid, success = result
256
257             shareverds = []
258             for (sharenum, bucket) in bucketdict.items():
259                 d = self._download_and_verify(serverid, sharenum, bucket)
260                 shareverds.append(d)
261
262             dl = deferredutil.gatherResults(shareverds)
263
264             def collect(results):
265                 verified = set()
266                 corrupt = set()
267                 incompatible = set()
268                 for succ, sharenum, whynot in results:
269                     if succ:
270                         verified.add(sharenum)
271                     else:
272                         if whynot == 'corrupt':
273                             corrupt.add(sharenum)
274                         elif whynot == 'incompatible':
275                             incompatible.add(sharenum)
276                 return (verified, serverid, corrupt, incompatible, success)
277
278             dl.addCallback(collect)
279             return dl
280
281         def _err(f):
282             f.trap(RemoteException, DeadReferenceError)
283             return (set(), serverid, set(), set(), False)
284
285         d.addCallbacks(_got_buckets, _err)
286         return d
287
288     def _check_server_shares(self, serverid, ss):
289         """Return a deferred which eventually fires with a tuple of
290         (set(sharenum), serverid, set(), set(), responded) showing all the
291         shares claimed to be served by this server. In case the server is
292         disconnected then it fires with (set() serverid, set(), set(), False)
293         (a server disconnecting when we ask it for buckets is the same, for
294         our purposes, as a server that says it has none, except that we want
295         to track and report whether or not each server responded.)"""
296         def _curry_empty_corrupted(res):
297             buckets, serverid, responded = res
298             return (set(buckets), serverid, set(), set(), responded)
299         d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
300         d.addCallback(_curry_empty_corrupted)
301         return d
302
303     def _format_results(self, results):
304         cr = CheckResults(self._verifycap, self._verifycap.storage_index)
305         d = {}
306         d['count-shares-needed'] = self._verifycap.needed_shares
307         d['count-shares-expected'] = self._verifycap.total_shares
308
309         verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
310         servers = {} # {serverid: set(sharenums)}
311         corruptsharelocators = [] # (serverid, storageindex, sharenum)
312         incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
313
314         for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
315             servers.setdefault(thisserverid, set()).update(theseverifiedshares)
316             for sharenum in theseverifiedshares:
317                 verifiedshares.setdefault(sharenum, set()).add(thisserverid)
318             for sharenum in thesecorruptshares:
319                 corruptsharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
320             for sharenum in theseincompatibleshares:
321                 incompatiblesharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
322
323         d['count-shares-good'] = len(verifiedshares)
324         d['count-good-share-hosts'] = len([s for s in servers.keys() if servers[s]])
325
326         assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
327         if len(verifiedshares) == self._verifycap.total_shares:
328             cr.set_healthy(True)
329             cr.set_summary("Healthy")
330         else:
331             cr.set_healthy(False)
332             cr.set_summary("Not Healthy: %d shares (enc %d-of-%d)" %
333                            (len(verifiedshares),
334                             self._verifycap.needed_shares,
335                             self._verifycap.total_shares))
336         if len(verifiedshares) >= self._verifycap.needed_shares:
337             cr.set_recoverable(True)
338             d['count-recoverable-versions'] = 1
339             d['count-unrecoverable-versions'] = 0
340         else:
341             cr.set_recoverable(False)
342             d['count-recoverable-versions'] = 0
343             d['count-unrecoverable-versions'] = 1
344
345         d['servers-responding'] = list(servers)
346         d['sharemap'] = verifiedshares
347         # no such thing as wrong shares of an immutable file
348         d['count-wrong-shares'] = 0
349         d['list-corrupt-shares'] = corruptsharelocators
350         d['count-corrupt-shares'] = len(corruptsharelocators)
351         d['list-incompatible-shares'] = incompatiblesharelocators
352         d['count-incompatible-shares'] = len(incompatiblesharelocators)
353
354
355         # The file needs rebalancing if the set of servers that have at least
356         # one share is less than the number of uniquely-numbered shares
357         # available.
358         cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
359
360         cr.set_data(d)
361
362         return cr
363
364     def start(self):
365         ds = []
366         if self._verify:
367             for (serverid, ss) in self._servers:
368                 ds.append(self._verify_server_shares(serverid, ss))
369         else:
370             for (serverid, ss) in self._servers:
371                 ds.append(self._check_server_shares(serverid, ss))
372
373         return deferredutil.gatherResults(ds).addCallback(self._format_results)