]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/checker.py
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / checker.py
1 from foolscap.api import DeadReferenceError, RemoteException
2 from twisted.internet import defer
3 from allmydata import hashtree
4 from allmydata.check_results import CheckResults
5 from allmydata.immutable import download
6 from allmydata.uri import CHKFileVerifierURI
7 from allmydata.util.assertutil import precondition
8 from allmydata.util import base32, deferredutil, dictutil, log
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10      file_cancel_secret_hash, bucket_renewal_secret_hash, \
11      bucket_cancel_secret_hash
12
13 from allmydata.immutable import layout
14
15 class Checker(log.PrefixingLogMixin):
16     """I query all servers to see if M uniquely-numbered shares are
17     available.
18
19     If the verify flag was passed to my constructor, then for each share I
20     download every data block and all metadata from each server and perform a
21     cryptographic integrity check on all of it. If not, I just ask each
22     server 'Which shares do you have?' and believe its answer.
23
24     In either case, I wait until I have gotten responses from all servers.
25     This fact -- that I wait -- means that an ill-behaved server which fails
26     to answer my questions will make me wait indefinitely. If it is
27     ill-behaved in a way that triggers the underlying foolscap timeouts, then
28     I will wait only as long as those foolscap timeouts, but if it is
29     ill-behaved in a way which placates the foolscap timeouts but still
30     doesn't answer my question then I will wait indefinitely.
31
32     Before I send any new request to a server, I always ask the 'monitor'
33     object that was passed into my constructor whether this task has been
34     cancelled (by invoking its raise_if_cancelled() method).
35     """
36     def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
37                  monitor):
38         assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
39         assert precondition(isinstance(servers, (set, frozenset)), servers)
40         for (serverid, serverrref) in servers:
41             assert precondition(isinstance(serverid, str))
42
43         prefix = "%s" % base32.b2a_l(verifycap.storage_index[:8], 60)
44         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
45
46         self._verifycap = verifycap
47
48         self._monitor = monitor
49         self._servers = servers
50         self._verify = verify # bool: verify what the servers claim, or not?
51         self._add_lease = add_lease
52
53         self._share_hash_tree = None
54
55         frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
56                                        self._verifycap.storage_index)
57         self.file_renewal_secret = frs
58         fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
59                                       self._verifycap.storage_index)
60         self.file_cancel_secret = fcs
61
62     def _get_renewal_secret(self, peerid):
63         return bucket_renewal_secret_hash(self.file_renewal_secret, peerid)
64     def _get_cancel_secret(self, peerid):
65         return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
66
67     def _get_buckets(self, server, storageindex, serverid):
68         """Return a deferred that eventually fires with ({sharenum: bucket},
69         serverid, success). In case the server is disconnected or returns a
70         Failure then it fires with ({}, serverid, False) (A server
71         disconnecting or returning a Failure when we ask it for buckets is
72         the same, for our purposes, as a server that says it has none, except
73         that we want to track and report whether or not each server
74         responded.)"""
75
76         d = server.callRemote("get_buckets", storageindex)
77         if self._add_lease:
78             renew_secret = self._get_renewal_secret(serverid)
79             cancel_secret = self._get_cancel_secret(serverid)
80             d2 = server.callRemote("add_lease", storageindex,
81                                    renew_secret, cancel_secret)
82             dl = defer.DeferredList([d, d2], consumeErrors=True)
83             def _done(res):
84                 [(get_success, get_result),
85                  (addlease_success, addlease_result)] = res
86                 # ignore remote IndexError on the add_lease call. Propagate
87                 # local errors and remote non-IndexErrors
88                 if addlease_success:
89                     return get_result
90                 if not addlease_result.check(RemoteException):
91                     # Propagate local errors
92                     return addlease_result
93                 if addlease_result.value.failure.check(IndexError):
94                     # tahoe=1.3.0 raised IndexError on non-existant
95                     # buckets, which we ignore
96                     return get_result
97                 # propagate remote errors that aren't IndexError, including
98                 # the unfortunate internal KeyError bug that <1.3.0 had.
99                 return addlease_result
100             dl.addCallback(_done)
101             d = dl
102
103         def _wrap_results(res):
104             return (res, serverid, True)
105
106         def _trap_errs(f):
107             level = log.WEIRD
108             if f.check(DeadReferenceError):
109                 level = log.UNUSUAL
110             self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ")
111             return ({}, serverid, False)
112
113         d.addCallbacks(_wrap_results, _trap_errs)
114         return d
115
116     def _download_and_verify(self, serverid, sharenum, bucket):
117         """Start an attempt to download and verify every block in this bucket
118         and return a deferred that will eventually fire once the attempt
119         completes.
120
121         If you download and verify every block then fire with (True,
122         sharenum, None), else if the share data couldn't be parsed because it
123         was of an unknown version number fire with (False, sharenum,
124         'incompatible'), else if any of the blocks were invalid, fire with
125         (False, sharenum, 'corrupt'), else if the server disconnected (False,
126         sharenum, 'disconnect'), else if the server returned a Failure during
127         the process fire with (False, sharenum, 'failure').
128
129         If there is an internal error such as an uncaught exception in this
130         code, then the deferred will errback, but if there is a remote error
131         such as the server failing or the returned data being incorrect then
132         it will not errback -- it will fire normally with the indicated
133         results."""
134
135         b = layout.ReadBucketProxy(bucket, serverid, self._verifycap.storage_index)
136         veup = download.ValidatedExtendedURIProxy(b, self._verifycap)
137         d = veup.start()
138
139         def _errb(f):
140             # We didn't succeed at fetching and verifying all the blocks of
141             # this share. Handle each reason for failure differently.
142
143             if f.check(DeadReferenceError):
144                 return (False, sharenum, 'disconnect')
145             elif f.check(RemoteException):
146                 return (False, sharenum, 'failure')
147             elif f.check(layout.ShareVersionIncompatible):
148                 return (False, sharenum, 'incompatible')
149             elif f.check(layout.LayoutInvalid,
150                          layout.RidiculouslyLargeURIExtensionBlock,
151                          download.BadOrMissingHash,
152                          download.BadURIExtensionHashValue):
153                 return (False, sharenum, 'corrupt')
154
155             # if it wasn't one of those reasons, re-raise the error
156             return f
157
158         def _got_ueb(vup):
159             self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
160             self._share_hash_tree.set_hashes({0: vup.share_root_hash})
161
162             vrbp = download.ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, vup.num_segments, vup.block_size, vup.share_size)
163
164             ds = []
165             for blocknum in range(vup.num_segments):
166                 def _discard_result(r):
167                     assert isinstance(r, str), r
168                     # to free up the RAM
169                     return None
170                 d2 = vrbp.get_block(blocknum)
171                 d2.addCallback(_discard_result)
172                 ds.append(d2)
173
174             dl = deferredutil.gatherResults(ds)
175             # dl will fire once every block of this share has been downloaded
176             # and verified, or else it will errback.
177
178             def _cb(result):
179                 return (True, sharenum, None)
180
181             dl.addCallback(_cb)
182             return dl
183
184         d.addCallback(_got_ueb)
185         d.addErrback(_errb)
186
187         return d
188
189     def _verify_server_shares(self, serverid, ss):
190         """ Return a deferred which eventually fires with a tuple of
191         (set(sharenum), serverid, set(corruptsharenum),
192         set(incompatiblesharenum), success) showing all the shares verified
193         to be served by this server, and all the corrupt shares served by the
194         server, and all the incompatible shares served by the server. In case
195         the server is disconnected or returns a Failure then it fires with
196         the last element False.
197
198         A server disconnecting or returning a failure when we ask it for
199         shares is the same, for our purposes, as a server that says it has
200         none or offers invalid ones, except that we want to track and report
201         the server's behavior. Similarly, the presence of corrupt shares is
202         mainly of use for diagnostics -- you can typically treat it as just
203         like being no share at all by just observing its absence from the
204         verified shares dict and ignoring its presence in the corrupt shares
205         dict.
206
207         The 'success' argument means whether the server responded to *any*
208         queries during this process, so if it responded to some queries and
209         then disconnected and ceased responding, or returned a failure, it is
210         still marked with the True flag for 'success'.
211         """
212         d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
213
214         def _got_buckets(result):
215             bucketdict, serverid, success = result
216
217             shareverds = []
218             for (sharenum, bucket) in bucketdict.items():
219                 d = self._download_and_verify(serverid, sharenum, bucket)
220                 shareverds.append(d)
221
222             dl = deferredutil.gatherResults(shareverds)
223
224             def collect(results):
225                 verified = set()
226                 corrupt = set()
227                 incompatible = set()
228                 for succ, sharenum, whynot in results:
229                     if succ:
230                         verified.add(sharenum)
231                     else:
232                         if whynot == 'corrupt':
233                             corrupt.add(sharenum)
234                         elif whynot == 'incompatible':
235                             incompatible.add(sharenum)
236                 return (verified, serverid, corrupt, incompatible, success)
237
238             dl.addCallback(collect)
239             return dl
240
241         def _err(f):
242             f.trap(RemoteException, DeadReferenceError)
243             return (set(), serverid, set(), set(), False)
244
245         d.addCallbacks(_got_buckets, _err)
246         return d
247
248     def _check_server_shares(self, serverid, ss):
249         """Return a deferred which eventually fires with a tuple of
250         (set(sharenum), serverid, set(), set(), responded) showing all the
251         shares claimed to be served by this server. In case the server is
252         disconnected then it fires with (set() serverid, set(), set(), False)
253         (a server disconnecting when we ask it for buckets is the same, for
254         our purposes, as a server that says it has none, except that we want
255         to track and report whether or not each server responded.)"""
256         def _curry_empty_corrupted(res):
257             buckets, serverid, responded = res
258             return (set(buckets), serverid, set(), set(), responded)
259         d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
260         d.addCallback(_curry_empty_corrupted)
261         return d
262
263     def _format_results(self, results):
264         cr = CheckResults(self._verifycap, self._verifycap.storage_index)
265         d = {}
266         d['count-shares-needed'] = self._verifycap.needed_shares
267         d['count-shares-expected'] = self._verifycap.total_shares
268
269         verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
270         servers = {} # {serverid: set(sharenums)}
271         corruptsharelocators = [] # (serverid, storageindex, sharenum)
272         incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
273
274         for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
275             servers.setdefault(thisserverid, set()).update(theseverifiedshares)
276             for sharenum in theseverifiedshares:
277                 verifiedshares.setdefault(sharenum, set()).add(thisserverid)
278             for sharenum in thesecorruptshares:
279                 corruptsharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
280             for sharenum in theseincompatibleshares:
281                 incompatiblesharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
282
283         d['count-shares-good'] = len(verifiedshares)
284         d['count-good-share-hosts'] = len([s for s in servers.keys() if servers[s]])
285
286         assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
287         if len(verifiedshares) == self._verifycap.total_shares:
288             cr.set_healthy(True)
289             cr.set_summary("Healthy")
290         else:
291             cr.set_healthy(False)
292             cr.set_summary("Not Healthy: %d shares (enc %d-of-%d)" %
293                            (len(verifiedshares),
294                             self._verifycap.needed_shares,
295                             self._verifycap.total_shares))
296         if len(verifiedshares) >= self._verifycap.needed_shares:
297             cr.set_recoverable(True)
298             d['count-recoverable-versions'] = 1
299             d['count-unrecoverable-versions'] = 0
300         else:
301             cr.set_recoverable(False)
302             d['count-recoverable-versions'] = 0
303             d['count-unrecoverable-versions'] = 1
304
305         d['servers-responding'] = list(servers)
306         d['sharemap'] = verifiedshares
307         # no such thing as wrong shares of an immutable file
308         d['count-wrong-shares'] = 0
309         d['list-corrupt-shares'] = corruptsharelocators
310         d['count-corrupt-shares'] = len(corruptsharelocators)
311         d['list-incompatible-shares'] = incompatiblesharelocators
312         d['count-incompatible-shares'] = len(incompatiblesharelocators)
313
314
315         # The file needs rebalancing if the set of servers that have at least
316         # one share is less than the number of uniquely-numbered shares
317         # available.
318         cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
319
320         cr.set_data(d)
321
322         return cr
323
324     def start(self):
325         ds = []
326         if self._verify:
327             for (serverid, ss) in self._servers:
328                 ds.append(self._verify_server_shares(serverid, ss))
329         else:
330             for (serverid, ss) in self._servers:
331                 ds.append(self._check_server_shares(serverid, ss))
332
333         return deferredutil.gatherResults(ds).addCallback(self._format_results)