1 from zope.interface import implements
2 from twisted.internet import defer
3 from foolscap.api import DeadReferenceError, RemoteException
4 from allmydata import hashtree, codec, uri
5 from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
6 from allmydata.hashtree import IncompleteHashTree
7 from allmydata.check_results import CheckResults
8 from allmydata.uri import CHKFileVerifierURI
9 from allmydata.util.assertutil import precondition
10 from allmydata.util import base32, deferredutil, dictutil, log, mathutil
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12 file_cancel_secret_hash, bucket_renewal_secret_hash, \
13 bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
16 from allmydata.immutable import layout
18 class IntegrityCheckReject(Exception):
20 class BadURIExtension(IntegrityCheckReject):
22 class BadURIExtensionHashValue(IntegrityCheckReject):
24 class BadOrMissingHash(IntegrityCheckReject):
26 class UnsupportedErasureCodec(BadURIExtension):
29 class ValidatedExtendedURIProxy:
30 implements(IValidatedThingProxy)
31 """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
32 responsible for retrieving and validating the elements from the UEB."""
34 def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
35 # fetch_failures is for debugging -- see test_encode.py
36 self._fetch_failures = fetch_failures
37 self._readbucketproxy = readbucketproxy
38 precondition(IVerifierURI.providedBy(verifycap), verifycap)
39 self._verifycap = verifycap
42 self.segment_size = None
43 self.crypttext_root_hash = None
44 self.share_root_hash = None
47 self.block_size = None
48 self.share_size = None
49 self.num_segments = None
50 self.tail_data_size = None
51 self.tail_segment_size = None
54 self.crypttext_hash = None
57 return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
59 def _check_integrity(self, data):
60 h = uri_extension_hash(data)
61 if h != self._verifycap.uri_extension_hash:
62 msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
63 (self._readbucketproxy,
64 base32.b2a(self._verifycap.uri_extension_hash),
66 if self._fetch_failures is not None:
67 self._fetch_failures["uri_extension"] += 1
68 raise BadURIExtensionHashValue(msg)
72 def _parse_and_validate(self, data):
73 self.share_size = mathutil.div_ceil(self._verifycap.size,
74 self._verifycap.needed_shares)
76 d = uri.unpack_extension(data)
78 # There are several kinds of things that can be found in a UEB.
79 # First, things that we really need to learn from the UEB in order to
80 # do this download. Next: things which are optional but not redundant
81 # -- if they are present in the UEB they will get used. Next, things
82 # that are optional and redundant. These things are required to be
83 # consistent: they don't have to be in the UEB, but if they are in
84 # the UEB then they will be checked for consistency with the
85 # already-known facts, and if they are inconsistent then an exception
86 # will be raised. These things aren't actually used -- they are just
87 # tested for consistency and ignored. Finally: things which are
88 # deprecated -- they ought not be in the UEB at all, and if they are
89 # present then a warning will be logged but they are otherwise
92 # First, things that we really need to learn from the UEB:
93 # segment_size, crypttext_root_hash, and share_root_hash.
94 self.segment_size = d['segment_size']
96 self.block_size = mathutil.div_ceil(self.segment_size,
97 self._verifycap.needed_shares)
98 self.num_segments = mathutil.div_ceil(self._verifycap.size,
101 self.tail_data_size = self._verifycap.size % self.segment_size
102 if not self.tail_data_size:
103 self.tail_data_size = self.segment_size
104 # padding for erasure code
105 self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
106 self._verifycap.needed_shares)
108 # Ciphertext hash tree root is mandatory, so that there is at most
109 # one ciphertext that matches this read-cap or verify-cap. The
110 # integrity check on the shares is not sufficient to prevent the
111 # original encoder from creating some shares of file A and other
113 self.crypttext_root_hash = d['crypttext_root_hash']
115 self.share_root_hash = d['share_root_hash']
118 # Next: things that are optional and not redundant: crypttext_hash
119 if d.has_key('crypttext_hash'):
120 self.crypttext_hash = d['crypttext_hash']
121 if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
122 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
125 # Next: things that are optional, redundant, and required to be
126 # consistent: codec_name, codec_params, tail_codec_params,
127 # num_segments, size, needed_shares, total_shares
128 if d.has_key('codec_name'):
129 if d['codec_name'] != "crs":
130 raise UnsupportedErasureCodec(d['codec_name'])
132 if d.has_key('codec_params'):
133 ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
134 if ucpss != self.segment_size:
135 raise BadURIExtension("inconsistent erasure code params: "
136 "ucpss: %s != self.segment_size: %s" %
137 (ucpss, self.segment_size))
138 if ucpns != self._verifycap.needed_shares:
139 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
140 "self._verifycap.needed_shares: %s" %
141 (ucpns, self._verifycap.needed_shares))
142 if ucpts != self._verifycap.total_shares:
143 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
144 "self._verifycap.total_shares: %s" %
145 (ucpts, self._verifycap.total_shares))
147 if d.has_key('tail_codec_params'):
148 utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
149 if utcpss != self.tail_segment_size:
150 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
151 "self.tail_segment_size: %s, self._verifycap.size: %s, "
152 "self.segment_size: %s, self._verifycap.needed_shares: %s"
153 % (utcpss, self.tail_segment_size, self._verifycap.size,
154 self.segment_size, self._verifycap.needed_shares))
155 if utcpns != self._verifycap.needed_shares:
156 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
157 "self._verifycap.needed_shares: %s" % (utcpns,
158 self._verifycap.needed_shares))
159 if utcpts != self._verifycap.total_shares:
160 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
161 "self._verifycap.total_shares: %s" % (utcpts,
162 self._verifycap.total_shares))
164 if d.has_key('num_segments'):
165 if d['num_segments'] != self.num_segments:
166 raise BadURIExtension("inconsistent num_segments: size: %s, "
167 "segment_size: %s, computed_num_segments: %s, "
168 "ueb_num_segments: %s" % (self._verifycap.size,
170 self.num_segments, d['num_segments']))
172 if d.has_key('size'):
173 if d['size'] != self._verifycap.size:
174 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
175 (self._verifycap.size, d['size']))
177 if d.has_key('needed_shares'):
178 if d['needed_shares'] != self._verifycap.needed_shares:
179 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
180 "needed shares: %s" % (self._verifycap.total_shares,
183 if d.has_key('total_shares'):
184 if d['total_shares'] != self._verifycap.total_shares:
185 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
186 "total shares: %s" % (self._verifycap.total_shares,
189 # Finally, things that are deprecated and ignored: plaintext_hash,
190 # plaintext_root_hash
191 if d.get('plaintext_hash'):
192 log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
193 "and is no longer used. Ignoring. %s" % (self,))
194 if d.get('plaintext_root_hash'):
195 log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
196 "reasons and is no longer used. Ignoring. %s" % (self,))
201 """Fetch the UEB from bucket, compare its hash to the hash from
202 verifycap, then parse it. Returns a deferred which is called back
203 with self once the fetch is successful, or is erred back if it
205 d = self._readbucketproxy.get_uri_extension()
206 d.addCallback(self._check_integrity)
207 d.addCallback(self._parse_and_validate)
210 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
211 """I am a front-end for a remote storage bucket, responsible for
212 retrieving and validating data from that bucket.
214 My get_block() method is used by BlockDownloaders.
217 def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
218 block_size, share_size):
219 """ share_hash_tree is required to have already been initialized with
220 the root hash (the number-0 hash), using the share_root_hash from the
222 precondition(share_hash_tree[0] is not None, share_hash_tree)
223 prefix = "%d-%s-%s" % (sharenum, bucket,
224 base32.b2a_l(share_hash_tree[0][:8], 60))
225 log.PrefixingLogMixin.__init__(self,
226 facility="tahoe.immutable.download",
228 self.sharenum = sharenum
230 self.share_hash_tree = share_hash_tree
231 self.num_blocks = num_blocks
232 self.block_size = block_size
233 self.share_size = share_size
234 self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
236 def get_all_sharehashes(self):
237 """Retrieve and validate all the share-hash-tree nodes that are
238 included in this share, regardless of whether we need them to
239 validate the share or not. Each share contains a minimal Merkle tree
240 chain, but there is lots of overlap, so usually we'll be using hashes
241 from other shares and not reading every single hash from this share.
242 The Verifier uses this function to read and validate every single
243 hash from this share.
245 Call this (and wait for the Deferred it returns to fire) before
246 calling get_block() for the first time: this lets us check that the
247 share share contains enough hashes to validate its own data, and
248 avoids downloading any share hash twice.
250 I return a Deferred which errbacks upon failure, probably with
253 d = self.bucket.get_share_hashes()
254 def _got_share_hashes(sh):
255 sharehashes = dict(sh)
257 self.share_hash_tree.set_hashes(sharehashes)
258 except IndexError, le:
259 raise BadOrMissingHash(le)
260 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
261 raise BadOrMissingHash(le)
262 d.addCallback(_got_share_hashes)
265 def get_all_blockhashes(self):
266 """Retrieve and validate all the block-hash-tree nodes that are
267 included in this share. Each share contains a full Merkle tree, but
268 we usually only fetch the minimal subset necessary for any particular
269 block. This function fetches everything at once. The Verifier uses
270 this function to validate the block hash tree.
272 Call this (and wait for the Deferred it returns to fire) after
273 calling get_all_sharehashes() and before calling get_block() for the
274 first time: this lets us check that the share contains all block
275 hashes and avoids downloading them multiple times.
277 I return a Deferred which errbacks upon failure, probably with
281 # get_block_hashes(anything) currently always returns everything
282 needed = list(range(len(self.block_hash_tree)))
283 d = self.bucket.get_block_hashes(needed)
284 def _got_block_hashes(blockhashes):
285 if len(blockhashes) < len(self.block_hash_tree):
286 raise BadOrMissingHash()
287 bh = dict(enumerate(blockhashes))
290 self.block_hash_tree.set_hashes(bh)
291 except IndexError, le:
292 raise BadOrMissingHash(le)
293 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
294 raise BadOrMissingHash(le)
295 d.addCallback(_got_block_hashes)
298 def get_all_crypttext_hashes(self, crypttext_hash_tree):
299 """Retrieve and validate all the crypttext-hash-tree nodes that are
300 in this share. Normally we don't look at these at all: the download
301 process fetches them incrementally as needed to validate each segment
302 of ciphertext. But this is a convenient place to give the Verifier a
303 function to validate all of these at once.
305 Call this with a new hashtree object for each share, initialized with
306 the crypttext hash tree root. I return a Deferred which errbacks upon
307 failure, probably with BadOrMissingHash.
310 # get_crypttext_hashes() always returns everything
311 d = self.bucket.get_crypttext_hashes()
312 def _got_crypttext_hashes(hashes):
313 if len(hashes) < len(crypttext_hash_tree):
314 raise BadOrMissingHash()
315 ct_hashes = dict(enumerate(hashes))
317 crypttext_hash_tree.set_hashes(ct_hashes)
318 except IndexError, le:
319 raise BadOrMissingHash(le)
320 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
321 raise BadOrMissingHash(le)
322 d.addCallback(_got_crypttext_hashes)
325 def get_block(self, blocknum):
326 # the first time we use this bucket, we need to fetch enough elements
327 # of the share hash tree to validate it from our share hash up to the
329 if self.share_hash_tree.needed_hashes(self.sharenum):
330 d1 = self.bucket.get_share_hashes()
332 d1 = defer.succeed([])
334 # We might need to grab some elements of our block hash tree, to
335 # validate the requested block up to the share hash.
336 blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
337 # We don't need the root of the block hash tree, as that comes in the
339 blockhashesneeded.discard(0)
340 d2 = self.bucket.get_block_hashes(blockhashesneeded)
342 if blocknum < self.num_blocks-1:
343 thisblocksize = self.block_size
345 thisblocksize = self.share_size % self.block_size
346 if thisblocksize == 0:
347 thisblocksize = self.block_size
348 d3 = self.bucket.get_block_data(blocknum,
349 self.block_size, thisblocksize)
351 dl = deferredutil.gatherResults([d1, d2, d3])
352 dl.addCallback(self._got_data, blocknum)
355 def _got_data(self, results, blocknum):
356 precondition(blocknum < self.num_blocks,
357 self, blocknum, self.num_blocks)
358 sharehashes, blockhashes, blockdata = results
360 sharehashes = dict(sharehashes)
361 except ValueError, le:
362 le.args = tuple(le.args + (sharehashes,))
364 blockhashes = dict(enumerate(blockhashes))
366 candidate_share_hash = None # in case we log it in the except block below
367 blockhash = None # in case we log it in the except block below
370 if self.share_hash_tree.needed_hashes(self.sharenum):
371 # This will raise exception if the values being passed do not
372 # match the root node of self.share_hash_tree.
374 self.share_hash_tree.set_hashes(sharehashes)
375 except IndexError, le:
376 # Weird -- sharehashes contained index numbers outside of
377 # the range that fit into this hash tree.
378 raise BadOrMissingHash(le)
380 # To validate a block we need the root of the block hash tree,
381 # which is also one of the leafs of the share hash tree, and is
382 # called "the share hash".
383 if not self.block_hash_tree[0]: # empty -- no root node yet
384 # Get the share hash from the share hash tree.
385 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
387 # No root node in block_hash_tree and also the share hash
388 # wasn't sent by the server.
389 raise hashtree.NotEnoughHashesError
390 self.block_hash_tree.set_hashes({0: share_hash})
392 if self.block_hash_tree.needed_hashes(blocknum):
393 self.block_hash_tree.set_hashes(blockhashes)
395 blockhash = block_hash(blockdata)
396 self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
397 #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
399 # (self.sharenum, blocknum, len(blockdata),
400 # blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
402 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
403 # log.WEIRD: indicates undetected disk/network error, or more
404 # likely a programming error
405 self.log("hash failure in block=%d, shnum=%d on %s" %
406 (blocknum, self.sharenum, self.bucket))
407 if self.block_hash_tree.needed_hashes(blocknum):
408 self.log(""" failure occurred when checking the block_hash_tree.
409 This suggests that either the block data was bad, or that the
410 block hashes we received along with it were bad.""")
412 self.log(""" the failure probably occurred when checking the
413 share_hash_tree, which suggests that the share hashes we
414 received from the remote peer were bad.""")
415 self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
416 self.log(" block length: %d" % len(blockdata))
417 self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
418 if len(blockdata) < 100:
419 self.log(" block data: %r" % (blockdata,))
421 self.log(" block data start/end: %r .. %r" %
422 (blockdata[:50], blockdata[-50:]))
423 self.log(" share hash tree:\n" + self.share_hash_tree.dump())
424 self.log(" block hash tree:\n" + self.block_hash_tree.dump())
426 for i,h in sorted(sharehashes.items()):
427 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
428 self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
430 for i,h in blockhashes.items():
431 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
432 log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
433 raise BadOrMissingHash(le)
435 # If we made it here, the block is good. If the hash trees didn't
436 # like what they saw, they would have raised a BadHashError, causing
437 # our caller to see a Failure and thus ignore this block (as well as
438 # dropping this bucket).
442 class Checker(log.PrefixingLogMixin):
443 """I query all servers to see if M uniquely-numbered shares are
446 If the verify flag was passed to my constructor, then for each share I
447 download every data block and all metadata from each server and perform a
448 cryptographic integrity check on all of it. If not, I just ask each
449 server 'Which shares do you have?' and believe its answer.
451 In either case, I wait until I have gotten responses from all servers.
452 This fact -- that I wait -- means that an ill-behaved server which fails
453 to answer my questions will make me wait indefinitely. If it is
454 ill-behaved in a way that triggers the underlying foolscap timeouts, then
455 I will wait only as long as those foolscap timeouts, but if it is
456 ill-behaved in a way which placates the foolscap timeouts but still
457 doesn't answer my question then I will wait indefinitely.
459 Before I send any new request to a server, I always ask the 'monitor'
460 object that was passed into my constructor whether this task has been
461 cancelled (by invoking its raise_if_cancelled() method).
463 def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
465 assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
467 prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
468 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
470 self._verifycap = verifycap
472 self._monitor = monitor
473 self._servers = servers
474 self._verify = verify # bool: verify what the servers claim, or not?
475 self._add_lease = add_lease
477 frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
478 self._verifycap.get_storage_index())
479 self.file_renewal_secret = frs
480 fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
481 self._verifycap.get_storage_index())
482 self.file_cancel_secret = fcs
484 def _get_renewal_secret(self, seed):
485 return bucket_renewal_secret_hash(self.file_renewal_secret, seed)
486 def _get_cancel_secret(self, seed):
487 return bucket_cancel_secret_hash(self.file_cancel_secret, seed)
489 def _get_buckets(self, s, storageindex):
490 """Return a deferred that eventually fires with ({sharenum: bucket},
491 serverid, success). In case the server is disconnected or returns a
492 Failure then it fires with ({}, serverid, False) (A server
493 disconnecting or returning a Failure when we ask it for buckets is
494 the same, for our purposes, as a server that says it has none, except
495 that we want to track and report whether or not each server
499 lease_seed = s.get_lease_seed()
500 serverid = s.get_serverid()
502 renew_secret = self._get_renewal_secret(lease_seed)
503 cancel_secret = self._get_cancel_secret(lease_seed)
504 d2 = rref.callRemote("add_lease", storageindex,
505 renew_secret, cancel_secret)
506 d2.addErrback(self._add_lease_failed, s.get_name(), storageindex)
508 d = rref.callRemote("get_buckets", storageindex)
509 def _wrap_results(res):
510 return (res, serverid, True)
514 if f.check(DeadReferenceError):
516 self.log("failure from server on 'get_buckets' the REMOTE failure was:",
517 facility="tahoe.immutable.checker",
518 failure=f, level=level, umid="AX7wZQ")
519 return ({}, serverid, False)
521 d.addCallbacks(_wrap_results, _trap_errs)
524 def _add_lease_failed(self, f, server_name, storage_index):
525 # Older versions of Tahoe didn't handle the add-lease message very
526 # well: <=1.1.0 throws a NameError because it doesn't implement
527 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
528 # (which is most of them, since we send add-lease to everybody,
529 # before we know whether or not they have any shares for us), and
530 # 1.2.0 throws KeyError even on known buckets due to an internal bug
531 # in the latency-measuring code.
533 # we want to ignore the known-harmless errors and log the others. In
534 # particular we want to log any local errors caused by coding
537 if f.check(DeadReferenceError):
539 if f.check(RemoteException):
540 if f.value.failure.check(KeyError, IndexError, NameError):
541 # this may ignore a bit too much, but that only hurts us
544 self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
546 f_value=str(f.value),
548 level=log.WEIRD, umid="atbAxw")
550 # local errors are cause for alarm
552 format="local error in add_lease to [%(name)s]: %(f_value)s",
554 f_value=str(f.value),
555 level=log.WEIRD, umid="hEGuQg")
558 def _download_and_verify(self, serverid, sharenum, bucket):
559 """Start an attempt to download and verify every block in this bucket
560 and return a deferred that will eventually fire once the attempt
563 If you download and verify every block then fire with (True,
564 sharenum, None), else if the share data couldn't be parsed because it
565 was of an unknown version number fire with (False, sharenum,
566 'incompatible'), else if any of the blocks were invalid, fire with
567 (False, sharenum, 'corrupt'), else if the server disconnected (False,
568 sharenum, 'disconnect'), else if the server returned a Failure during
569 the process fire with (False, sharenum, 'failure').
571 If there is an internal error such as an uncaught exception in this
572 code, then the deferred will errback, but if there is a remote error
573 such as the server failing or the returned data being incorrect then
574 it will not errback -- it will fire normally with the indicated
577 vcap = self._verifycap
578 b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index())
579 veup = ValidatedExtendedURIProxy(b, vcap)
583 share_hash_tree = IncompleteHashTree(vcap.total_shares)
584 share_hash_tree.set_hashes({0: vup.share_root_hash})
586 vrbp = ValidatedReadBucketProxy(sharenum, b,
592 # note: normal download doesn't use get_all_sharehashes(),
593 # because it gets more data than necessary. We've discussed the
594 # security properties of having verification and download look
595 # identical (so the server couldn't, say, provide good responses
596 # for one and not the other), but I think that full verification
597 # is more important than defending against inconsistent server
598 # behavior. Besides, they can't pass the verifier without storing
599 # all the data, so there's not so much to be gained by behaving
601 d = vrbp.get_all_sharehashes()
602 # we fill share_hash_tree before fetching any blocks, so the
603 # block fetches won't send redundant share-hash-tree requests, to
604 # speed things up. Then we fetch+validate all the blockhashes.
605 d.addCallback(lambda ign: vrbp.get_all_blockhashes())
607 cht = IncompleteHashTree(vup.num_segments)
608 cht.set_hashes({0: vup.crypttext_root_hash})
609 d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
611 d.addCallback(lambda ign: vrbp)
613 d.addCallback(_got_ueb)
615 def _discard_result(r):
616 assert isinstance(r, str), r
619 def _get_blocks(vrbp):
621 for blocknum in range(veup.num_segments):
622 db = vrbp.get_block(blocknum)
623 db.addCallback(_discard_result)
625 # this gatherResults will fire once every block of this share has
626 # been downloaded and verified, or else it will errback.
627 return deferredutil.gatherResults(ds)
628 d.addCallback(_get_blocks)
630 # if none of those errbacked, the blocks (and the hashes above them)
633 return (True, sharenum, None)
634 d.addCallback(_all_good)
636 # but if anything fails, we'll land here
638 # We didn't succeed at fetching and verifying all the blocks of
639 # this share. Handle each reason for failure differently.
641 if f.check(DeadReferenceError):
642 return (False, sharenum, 'disconnect')
643 elif f.check(RemoteException):
644 return (False, sharenum, 'failure')
645 elif f.check(layout.ShareVersionIncompatible):
646 return (False, sharenum, 'incompatible')
647 elif f.check(layout.LayoutInvalid,
648 layout.RidiculouslyLargeURIExtensionBlock,
650 BadURIExtensionHashValue):
651 return (False, sharenum, 'corrupt')
653 # if it wasn't one of those reasons, re-raise the error
659 def _verify_server_shares(self, s):
660 """ Return a deferred which eventually fires with a tuple of
661 (set(sharenum), serverid, set(corruptsharenum),
662 set(incompatiblesharenum), success) showing all the shares verified
663 to be served by this server, and all the corrupt shares served by the
664 server, and all the incompatible shares served by the server. In case
665 the server is disconnected or returns a Failure then it fires with
666 the last element False.
668 A server disconnecting or returning a failure when we ask it for
669 shares is the same, for our purposes, as a server that says it has
670 none or offers invalid ones, except that we want to track and report
671 the server's behavior. Similarly, the presence of corrupt shares is
672 mainly of use for diagnostics -- you can typically treat it as just
673 like being no share at all by just observing its absence from the
674 verified shares dict and ignoring its presence in the corrupt shares
677 The 'success' argument means whether the server responded to *any*
678 queries during this process, so if it responded to some queries and
679 then disconnected and ceased responding, or returned a failure, it is
680 still marked with the True flag for 'success'.
682 d = self._get_buckets(s, self._verifycap.get_storage_index())
684 def _got_buckets(result):
685 bucketdict, serverid, success = result
688 for (sharenum, bucket) in bucketdict.items():
689 d = self._download_and_verify(serverid, sharenum, bucket)
692 dl = deferredutil.gatherResults(shareverds)
694 def collect(results):
698 for succ, sharenum, whynot in results:
700 verified.add(sharenum)
702 if whynot == 'corrupt':
703 corrupt.add(sharenum)
704 elif whynot == 'incompatible':
705 incompatible.add(sharenum)
706 return (verified, serverid, corrupt, incompatible, success)
708 dl.addCallback(collect)
712 f.trap(RemoteException, DeadReferenceError)
713 return (set(), s.get_serverid(), set(), set(), False)
715 d.addCallbacks(_got_buckets, _err)
718 def _check_server_shares(self, s):
719 """Return a deferred which eventually fires with a tuple of
720 (set(sharenum), serverid, set(), set(), responded) showing all the
721 shares claimed to be served by this server. In case the server is
722 disconnected then it fires with (set() serverid, set(), set(), False)
723 (a server disconnecting when we ask it for buckets is the same, for
724 our purposes, as a server that says it has none, except that we want
725 to track and report whether or not each server responded.)"""
726 def _curry_empty_corrupted(res):
727 buckets, serverid, responded = res
728 return (set(buckets), serverid, set(), set(), responded)
729 d = self._get_buckets(s, self._verifycap.get_storage_index())
730 d.addCallback(_curry_empty_corrupted)
733 def _format_results(self, results):
734 cr = CheckResults(self._verifycap, self._verifycap.get_storage_index())
736 d['count-shares-needed'] = self._verifycap.needed_shares
737 d['count-shares-expected'] = self._verifycap.total_shares
739 verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
740 servers = {} # {serverid: set(sharenums)}
741 corruptsharelocators = [] # (serverid, storageindex, sharenum)
742 incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
744 for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
745 servers.setdefault(thisserverid, set()).update(theseverifiedshares)
746 for sharenum in theseverifiedshares:
747 verifiedshares.setdefault(sharenum, set()).add(thisserverid)
748 for sharenum in thesecorruptshares:
749 corruptsharelocators.append((thisserverid, self._verifycap.get_storage_index(), sharenum))
750 for sharenum in theseincompatibleshares:
751 incompatiblesharelocators.append((thisserverid, self._verifycap.get_storage_index(), sharenum))
753 d['count-shares-good'] = len(verifiedshares)
754 d['count-good-share-hosts'] = len([s for s in servers.keys() if servers[s]])
756 assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
757 if len(verifiedshares) == self._verifycap.total_shares:
759 cr.set_summary("Healthy")
761 cr.set_healthy(False)
762 cr.set_summary("Not Healthy: %d shares (enc %d-of-%d)" %
763 (len(verifiedshares),
764 self._verifycap.needed_shares,
765 self._verifycap.total_shares))
766 if len(verifiedshares) >= self._verifycap.needed_shares:
767 cr.set_recoverable(True)
768 d['count-recoverable-versions'] = 1
769 d['count-unrecoverable-versions'] = 0
771 cr.set_recoverable(False)
772 d['count-recoverable-versions'] = 0
773 d['count-unrecoverable-versions'] = 1
775 d['servers-responding'] = list(servers)
776 d['sharemap'] = verifiedshares
777 # no such thing as wrong shares of an immutable file
778 d['count-wrong-shares'] = 0
779 d['list-corrupt-shares'] = corruptsharelocators
780 d['count-corrupt-shares'] = len(corruptsharelocators)
781 d['list-incompatible-shares'] = incompatiblesharelocators
782 d['count-incompatible-shares'] = len(incompatiblesharelocators)
785 # The file needs rebalancing if the set of servers that have at least
786 # one share is less than the number of uniquely-numbered shares
788 cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
797 for s in self._servers:
798 ds.append(self._verify_server_shares(s))
800 for s in self._servers:
801 ds.append(self._check_server_shares(s))
803 return deferredutil.gatherResults(ds).addCallback(self._format_results)