3 Given a StorageIndex, count how many shares we can find.
5 This does no verification of the shares whatsoever. If the peer claims to
6 have the share, we believe them.
9 from twisted.internet import defer
10 from twisted.python import log
11 from allmydata import storage
12 from allmydata.checker_results import CheckerResults
13 from allmydata.immutable import download
14 from allmydata.uri import CHKFileURI
15 from allmydata.util import hashutil
16 from allmydata.util.assertutil import precondition
18 class SimpleCHKFileChecker:
19 """Return a list of (needed, total, found, sharemap), where sharemap maps
20 share number to a list of (binary) nodeids of the shareholders."""
22 def __init__(self, client, uri, storage_index, needed_shares, total_shares):
23 self.peer_getter = client.get_permuted_peers
24 self.needed_shares = needed_shares
25 self.total_shares = total_shares
26 self.found_shares = set()
28 self.storage_index = storage_index
30 self.responded = set()
33 def check_synchronously(self, si):
34 # this is how we would write this class if we were using synchronous
35 # messages (or if we used promises).
37 for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
38 buckets = connection.get_buckets(si)
39 found.update(buckets.keys())
44 d = self._get_all_shareholders(self.storage_index)
45 d.addCallback(self._done)
48 def _get_all_shareholders(self, storage_index):
50 for (peerid, ss) in self.peer_getter("storage", storage_index):
51 d = ss.callRemote("get_buckets", storage_index)
52 d.addCallbacks(self._got_response, self._got_error,
53 callbackArgs=(peerid,))
55 return defer.DeferredList(dl)
57 def _got_response(self, buckets, peerid):
58 # buckets is a dict: maps shum to an rref of the server who holds it
59 self.found_shares.update(buckets.keys())
61 if k not in self.sharemap:
63 self.sharemap[k].append(peerid)
64 self.responded.add(peerid)
66 def _got_error(self, f):
73 r = CheckerResults(self.uri.to_string(), self.storage_index)
75 healthy = bool(len(self.found_shares) >= self.total_shares)
76 r.set_healthy(healthy)
77 recoverable = bool(len(self.found_shares) >= self.needed_shares)
78 r.set_recoverable(recoverable)
79 data = {"count-shares-good": len(self.found_shares),
80 "count-shares-needed": self.needed_shares,
81 "count-shares-expected": self.total_shares,
82 "count-wrong-shares": 0,
85 data["count-recoverable-versions"] = 1
86 data["count-unrecoverable-versions"] = 0
88 data["count-recoverable-versions"] = 0
89 data["count-unrecoverable-versions"] = 1
91 data["count-corrupt-shares"] = 0 # non-verifier doesn't see corruption
92 data["list-corrupt-shares"] = []
95 for (shnum,nodeids) in self.sharemap.items():
97 sharemap[shnum] = nodeids
98 data["count-good-share-hosts"] = len(hosts)
99 data["servers-responding"] = list(self.responded)
100 data["sharemap"] = sharemap
103 r.set_needs_rebalancing(bool( len(self.found_shares) > len(hosts) ))
105 #r.stuff = (self.needed_shares, self.total_shares,
106 # len(self.found_shares), self.sharemap)
107 if len(self.found_shares) < self.total_shares:
108 wanted = set(range(self.total_shares))
109 missing = wanted - self.found_shares
110 report.append("Missing shares: %s" %
111 ",".join(["sh%d" % shnum
112 for shnum in sorted(missing)]))
115 r.set_summary("Healthy")
117 r.set_summary("Not Healthy")
121 class VerifyingOutput:
122 def __init__(self, total_length, results):
123 self._crypttext_hasher = hashutil.crypttext_hasher()
125 self.total_length = total_length
126 self._segment_number = 0
127 self._crypttext_hash_tree = None
129 self._results = results
130 results.set_healthy(False)
131 results.set_recoverable(False)
132 results.set_summary("Not Healthy")
134 def got_crypttext_hash_tree(self, crypttext_hashtree):
135 self._crypttext_hash_tree = crypttext_hashtree
137 def write_segment(self, crypttext):
138 self.length += len(crypttext)
140 self._crypttext_hasher.update(crypttext)
141 if self._crypttext_hash_tree:
142 ch = hashutil.crypttext_segment_hasher()
144 crypttext_leaves = {self._segment_number: ch.digest()}
145 self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves)
147 self._segment_number += 1
150 self.crypttext_hash = self._crypttext_hasher.digest()
153 self._results.set_healthy(True)
154 self._results.set_recoverable(True)
155 self._results.set_summary("Healthy")
156 # the return value of finish() is passed out of FileDownloader._done,
157 # but SimpleCHKFileVerifier overrides this with the CheckerResults
161 class SimpleCHKFileVerifier(download.FileDownloader):
162 # this reconstructs the crypttext, which verifies that at least 'k' of
163 # the shareholders are around and have valid data. It does not check the
164 # remaining shareholders, and it cannot verify the plaintext.
165 check_plaintext_hash = False
167 def __init__(self, client, u, storage_index, k, N, size, ueb_hash):
168 precondition(isinstance(u, CHKFileURI), u)
169 download.FileDownloader.__init__(self, client, u, None);
170 self._client = client
173 self._storage_index = storage_index
174 self._uri_extension_hash = ueb_hash
175 self._total_shares = N
177 self._num_needed_shares = k
179 self._si_s = storage.si_b2a(self._storage_index)
182 self._check_results = r = CheckerResults(self._uri.to_string(), self._storage_index)
183 r.set_data({"count-shares-needed": k,
184 "count-shares-expected": N,
186 self._output = VerifyingOutput(self._size, r)
188 self._stopped = False
191 self.active_buckets = {} # k: shnum, v: bucket
192 self._share_buckets = [] # list of (sharenum, bucket) tuples
193 self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
194 self._uri_extension_sources = []
196 self._uri_extension_data = None
198 self._fetch_failures = {"uri_extension": 0,
199 "plaintext_hashroot": 0,
200 "plaintext_hashtree": 0,
201 "crypttext_hashroot": 0,
202 "crypttext_hashtree": 0,
205 def init_logging(self):
206 self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
207 num = self._client.log("SimpleCHKFileVerifier(%s): starting" % prefix)
208 self._log_number = num
210 def log(self, *args, **kwargs):
211 if not "parent" in kwargs:
212 kwargs['parent'] = self._log_number
213 # add a prefix to the message, regardless of how it is expressed
214 prefix = "SimpleCHKFileVerifier(%s): " % self._log_prefix
215 if "format" in kwargs:
216 kwargs["format"] = prefix + kwargs["format"]
217 elif "message" in kwargs:
218 kwargs["message"] = prefix + kwargs["message"]
221 args = (m,) + args[1:]
222 return self._client.log(*args, **kwargs)
226 log.msg("starting download [%s]" % storage.si_b2a(self._storage_index)[:5])
228 # first step: who should we download from?
229 d = defer.maybeDeferred(self._get_all_shareholders)
230 d.addCallback(self._got_all_shareholders)
231 # now get the uri_extension block from somebody and validate it
232 d.addCallback(self._obtain_uri_extension)
233 d.addCallback(self._get_crypttext_hash_tree)
234 # once we know that, we can download blocks from everybody
235 d.addCallback(self._download_all_segments)
236 d.addCallback(self._done)
237 d.addCallbacks(self._verify_done, self._verify_failed)
240 def _verify_done(self, ignored):
241 # TODO: The following results are just stubs, and need to be replaced
242 # with actual values. These exist to make things like deep-check not
244 self._check_results.set_needs_rebalancing(False)
245 N = self._total_shares
247 "count-shares-good": N,
248 "count-good-share-hosts": N,
249 "count-corrupt-shares": 0,
250 "list-corrupt-shares": [],
251 "servers-responding": [],
253 "count-wrong-shares": 0,
254 "count-recoverable-versions": 1,
255 "count-unrecoverable-versions": 0,
257 self._check_results.set_data(data)
258 return self._check_results
260 def _verify_failed(self, ignored):
261 # TODO: The following results are just stubs, and need to be replaced
262 # with actual values. These exist to make things like deep-check not
264 self._check_results.set_needs_rebalancing(False)
265 N = self._total_shares
267 "count-shares-good": 0,
268 "count-good-share-hosts": 0,
269 "count-corrupt-shares": 0,
270 "list-corrupt-shares": [],
271 "servers-responding": [],
273 "count-wrong-shares": 0,
274 "count-recoverable-versions": 0,
275 "count-unrecoverable-versions": 1,
277 self._check_results.set_data(data)
278 return self._check_results