]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/checker.py
immutable: use new logging mixins to simplify logging
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / checker.py
1
2 """
3 Given a StorageIndex, count how many shares we can find.
4
5 This does no verification of the shares whatsoever. If the peer claims to
6 have the share, we believe them.
7 """
8
9 from twisted.internet import defer
10 from twisted.python import log
11 from allmydata import storage
12 from allmydata.checker_results import CheckerResults
13 from allmydata.immutable import download
14 from allmydata.uri import CHKFileURI
15 from allmydata.util import hashutil
16 from allmydata.util.assertutil import precondition
17
18 class SimpleCHKFileChecker:
19     """Return a list of (needed, total, found, sharemap), where sharemap maps
20     share number to a list of (binary) nodeids of the shareholders."""
21
22     def __init__(self, client, uri, storage_index, needed_shares, total_shares):
23         self.peer_getter = client.get_permuted_peers
24         self.needed_shares = needed_shares
25         self.total_shares = total_shares
26         self.found_shares = set()
27         self.uri = uri
28         self.storage_index = storage_index
29         self.sharemap = {}
30         self.responded = set()
31
32     '''
33     def check_synchronously(self, si):
34         # this is how we would write this class if we were using synchronous
35         # messages (or if we used promises).
36         found = set()
37         for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
38             buckets = connection.get_buckets(si)
39             found.update(buckets.keys())
40         return len(found)
41     '''
42
43     def start(self):
44         d = self._get_all_shareholders(self.storage_index)
45         d.addCallback(self._done)
46         return d
47
48     def _get_all_shareholders(self, storage_index):
49         dl = []
50         for (peerid, ss) in self.peer_getter("storage", storage_index):
51             d = ss.callRemote("get_buckets", storage_index)
52             d.addCallbacks(self._got_response, self._got_error,
53                            callbackArgs=(peerid,))
54             dl.append(d)
55         return defer.DeferredList(dl)
56
57     def _got_response(self, buckets, peerid):
58         # buckets is a dict: maps shum to an rref of the server who holds it
59         self.found_shares.update(buckets.keys())
60         for k in buckets:
61             if k not in self.sharemap:
62                 self.sharemap[k] = []
63             self.sharemap[k].append(peerid)
64         self.responded.add(peerid)
65
66     def _got_error(self, f):
67         if f.check(KeyError):
68             pass
69         log.err(f)
70         pass
71
72     def _done(self, res):
73         r = CheckerResults(self.uri.to_string(), self.storage_index)
74         report = []
75         healthy = bool(len(self.found_shares) >= self.total_shares)
76         r.set_healthy(healthy)
77         recoverable = bool(len(self.found_shares) >= self.needed_shares)
78         r.set_recoverable(recoverable)
79         data = {"count-shares-good": len(self.found_shares),
80                 "count-shares-needed": self.needed_shares,
81                 "count-shares-expected": self.total_shares,
82                 "count-wrong-shares": 0,
83                 }
84         if recoverable:
85             data["count-recoverable-versions"] = 1
86             data["count-unrecoverable-versions"] = 0
87         else:
88             data["count-recoverable-versions"] = 0
89             data["count-unrecoverable-versions"] = 1
90
91         data["count-corrupt-shares"] = 0 # non-verifier doesn't see corruption
92         data["list-corrupt-shares"] = []
93         hosts = set()
94         sharemap = {}
95         for (shnum,nodeids) in self.sharemap.items():
96             hosts.update(nodeids)
97             sharemap[shnum] = nodeids
98         data["count-good-share-hosts"] = len(hosts)
99         data["servers-responding"] = list(self.responded)
100         data["sharemap"] = sharemap
101
102         r.set_data(data)
103         r.set_needs_rebalancing(bool( len(self.found_shares) > len(hosts) ))
104
105         #r.stuff = (self.needed_shares, self.total_shares,
106         #            len(self.found_shares), self.sharemap)
107         if len(self.found_shares) < self.total_shares:
108             wanted = set(range(self.total_shares))
109             missing = wanted - self.found_shares
110             report.append("Missing shares: %s" %
111                           ",".join(["sh%d" % shnum
112                                     for shnum in sorted(missing)]))
113         r.set_report(report)
114         if healthy:
115             r.set_summary("Healthy")
116         else:
117             r.set_summary("Not Healthy")
118             # TODO: more detail
119         return r
120
121 class VerifyingOutput:
122     def __init__(self, total_length, results):
123         self._crypttext_hasher = hashutil.crypttext_hasher()
124         self.length = 0
125         self.total_length = total_length
126         self._segment_number = 0
127         self._crypttext_hash_tree = None
128         self._opened = False
129         self._results = results
130         results.set_healthy(False)
131         results.set_recoverable(False)
132         results.set_summary("Not Healthy")
133
134     def got_crypttext_hash_tree(self, crypttext_hashtree):
135         self._crypttext_hash_tree = crypttext_hashtree
136
137     def write_segment(self, crypttext):
138         self.length += len(crypttext)
139
140         self._crypttext_hasher.update(crypttext)
141         if self._crypttext_hash_tree:
142             ch = hashutil.crypttext_segment_hasher()
143             ch.update(crypttext)
144             crypttext_leaves = {self._segment_number: ch.digest()}
145             self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves)
146
147         self._segment_number += 1
148
149     def close(self):
150         self.crypttext_hash = self._crypttext_hasher.digest()
151
152     def finish(self):
153         self._results.set_healthy(True)
154         self._results.set_recoverable(True)
155         self._results.set_summary("Healthy")
156         # the return value of finish() is passed out of FileDownloader._done,
157         # but SimpleCHKFileVerifier overrides this with the CheckerResults
158         # instance instead.
159
160
161 class SimpleCHKFileVerifier(download.FileDownloader):
162     # this reconstructs the crypttext, which verifies that at least 'k' of
163     # the shareholders are around and have valid data. It does not check the
164     # remaining shareholders, and it cannot verify the plaintext.
165     check_plaintext_hash = False
166
167     def __init__(self, client, u, storage_index, k, N, size, ueb_hash):
168         precondition(isinstance(u, CHKFileURI), u)
169         download.FileDownloader.__init__(self, client, u, None);
170         self._client = client
171
172         self._uri = u
173         self._storage_index = storage_index
174         self._uri_extension_hash = ueb_hash
175         self._total_shares = N
176         self._size = size
177         self._num_needed_shares = k
178
179         self._si_s = storage.si_b2a(self._storage_index)
180         self.init_logging()
181
182         self._check_results = r = CheckerResults(self._uri.to_string(), self._storage_index)
183         r.set_data({"count-shares-needed": k,
184                     "count-shares-expected": N,
185                     })
186         self._output = VerifyingOutput(self._size, r)
187         self._paused = False
188         self._stopped = False
189
190         self._results = None
191         self.active_buckets = {} # k: shnum, v: bucket
192         self._share_buckets = [] # list of (sharenum, bucket) tuples
193         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
194         self._uri_extension_sources = []
195
196         self._uri_extension_data = None
197
198         self._fetch_failures = {"uri_extension": 0,
199                                 "plaintext_hashroot": 0,
200                                 "plaintext_hashtree": 0,
201                                 "crypttext_hashroot": 0,
202                                 "crypttext_hashtree": 0,
203                                 }
204
205     def init_logging(self):
206         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
207         num = self._client.log("SimpleCHKFileVerifier(%s): starting" % prefix)
208         self._log_number = num
209
210     def log(self, *args, **kwargs):
211         if not "parent" in kwargs:
212             kwargs['parent'] = self._log_number
213         # add a prefix to the message, regardless of how it is expressed
214         prefix = "SimpleCHKFileVerifier(%s): " % self._log_prefix
215         if "format" in kwargs:
216             kwargs["format"] = prefix + kwargs["format"]
217         elif "message" in kwargs:
218             kwargs["message"] = prefix + kwargs["message"]
219         elif args:
220             m = prefix + args[0]
221             args = (m,) + args[1:]
222         return self._client.log(*args, **kwargs)
223
224
225     def start(self):
226         log.msg("starting download [%s]" % storage.si_b2a(self._storage_index)[:5])
227
228         # first step: who should we download from?
229         d = defer.maybeDeferred(self._get_all_shareholders)
230         d.addCallback(self._got_all_shareholders)
231         # now get the uri_extension block from somebody and validate it
232         d.addCallback(self._obtain_uri_extension)
233         d.addCallback(self._get_crypttext_hash_tree)
234         # once we know that, we can download blocks from everybody
235         d.addCallback(self._download_all_segments)
236         d.addCallback(self._done)
237         d.addCallbacks(self._verify_done, self._verify_failed)
238         return d
239
240     def _verify_done(self, ignored):
241         # TODO: The following results are just stubs, and need to be replaced
242         # with actual values. These exist to make things like deep-check not
243         # fail.
244         self._check_results.set_needs_rebalancing(False)
245         N = self._total_shares
246         data = {
247             "count-shares-good": N,
248             "count-good-share-hosts": N,
249             "count-corrupt-shares": 0,
250             "list-corrupt-shares": [],
251             "servers-responding": [],
252             "sharemap": {},
253             "count-wrong-shares": 0,
254             "count-recoverable-versions": 1,
255             "count-unrecoverable-versions": 0,
256             }
257         self._check_results.set_data(data)
258         return self._check_results
259
260     def _verify_failed(self, ignored):
261         # TODO: The following results are just stubs, and need to be replaced
262         # with actual values. These exist to make things like deep-check not
263         # fail.
264         self._check_results.set_needs_rebalancing(False)
265         N = self._total_shares
266         data = {
267             "count-shares-good": 0,
268             "count-good-share-hosts": 0,
269             "count-corrupt-shares": 0,
270             "list-corrupt-shares": [],
271             "servers-responding": [],
272             "sharemap": {},
273             "count-wrong-shares": 0,
274             "count-recoverable-versions": 0,
275             "count-unrecoverable-versions": 1,
276             }
277         self._check_results.set_data(data)
278         return self._check_results