1 from zope.interface import implements
2 from twisted.internet import defer
3 from foolscap.api import DeadReferenceError, RemoteException
4 from allmydata import hashtree, codec, uri
5 from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
6 from allmydata.hashtree import IncompleteHashTree
7 from allmydata.check_results import CheckResults
8 from allmydata.uri import CHKFileVerifierURI
9 from allmydata.util.assertutil import precondition
10 from allmydata.util import base32, deferredutil, dictutil, log, mathutil
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12 file_cancel_secret_hash, bucket_renewal_secret_hash, \
13 bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
16 from allmydata.immutable import layout
18 class IntegrityCheckReject(Exception):
20 class BadURIExtension(IntegrityCheckReject):
22 class BadURIExtensionHashValue(IntegrityCheckReject):
24 class BadOrMissingHash(IntegrityCheckReject):
26 class UnsupportedErasureCodec(BadURIExtension):
29 class ValidatedExtendedURIProxy:
30 implements(IValidatedThingProxy)
31 """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
32 responsible for retrieving and validating the elements from the UEB."""
34 def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
35 # fetch_failures is for debugging -- see test_encode.py
36 self._fetch_failures = fetch_failures
37 self._readbucketproxy = readbucketproxy
38 precondition(IVerifierURI.providedBy(verifycap), verifycap)
39 self._verifycap = verifycap
42 self.segment_size = None
43 self.crypttext_root_hash = None
44 self.share_root_hash = None
47 self.block_size = None
48 self.share_size = None
49 self.num_segments = None
50 self.tail_data_size = None
51 self.tail_segment_size = None
54 self.crypttext_hash = None
57 return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
59 def _check_integrity(self, data):
60 h = uri_extension_hash(data)
61 if h != self._verifycap.uri_extension_hash:
62 msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
63 (self._readbucketproxy,
64 base32.b2a(self._verifycap.uri_extension_hash),
66 if self._fetch_failures is not None:
67 self._fetch_failures["uri_extension"] += 1
68 raise BadURIExtensionHashValue(msg)
72 def _parse_and_validate(self, data):
73 self.share_size = mathutil.div_ceil(self._verifycap.size,
74 self._verifycap.needed_shares)
76 d = uri.unpack_extension(data)
78 # There are several kinds of things that can be found in a UEB.
79 # First, things that we really need to learn from the UEB in order to
80 # do this download. Next: things which are optional but not redundant
81 # -- if they are present in the UEB they will get used. Next, things
82 # that are optional and redundant. These things are required to be
83 # consistent: they don't have to be in the UEB, but if they are in
84 # the UEB then they will be checked for consistency with the
85 # already-known facts, and if they are inconsistent then an exception
86 # will be raised. These things aren't actually used -- they are just
87 # tested for consistency and ignored. Finally: things which are
88 # deprecated -- they ought not be in the UEB at all, and if they are
89 # present then a warning will be logged but they are otherwise
92 # First, things that we really need to learn from the UEB:
93 # segment_size, crypttext_root_hash, and share_root_hash.
94 self.segment_size = d['segment_size']
96 self.block_size = mathutil.div_ceil(self.segment_size,
97 self._verifycap.needed_shares)
98 self.num_segments = mathutil.div_ceil(self._verifycap.size,
101 self.tail_data_size = self._verifycap.size % self.segment_size
102 if not self.tail_data_size:
103 self.tail_data_size = self.segment_size
104 # padding for erasure code
105 self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
106 self._verifycap.needed_shares)
108 # Ciphertext hash tree root is mandatory, so that there is at most
109 # one ciphertext that matches this read-cap or verify-cap. The
110 # integrity check on the shares is not sufficient to prevent the
111 # original encoder from creating some shares of file A and other
113 self.crypttext_root_hash = d['crypttext_root_hash']
115 self.share_root_hash = d['share_root_hash']
118 # Next: things that are optional and not redundant: crypttext_hash
119 if d.has_key('crypttext_hash'):
120 self.crypttext_hash = d['crypttext_hash']
121 if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
122 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
125 # Next: things that are optional, redundant, and required to be
126 # consistent: codec_name, codec_params, tail_codec_params,
127 # num_segments, size, needed_shares, total_shares
128 if d.has_key('codec_name'):
129 if d['codec_name'] != "crs":
130 raise UnsupportedErasureCodec(d['codec_name'])
132 if d.has_key('codec_params'):
133 ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
134 if ucpss != self.segment_size:
135 raise BadURIExtension("inconsistent erasure code params: "
136 "ucpss: %s != self.segment_size: %s" %
137 (ucpss, self.segment_size))
138 if ucpns != self._verifycap.needed_shares:
139 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
140 "self._verifycap.needed_shares: %s" %
141 (ucpns, self._verifycap.needed_shares))
142 if ucpts != self._verifycap.total_shares:
143 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
144 "self._verifycap.total_shares: %s" %
145 (ucpts, self._verifycap.total_shares))
147 if d.has_key('tail_codec_params'):
148 utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
149 if utcpss != self.tail_segment_size:
150 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
151 "self.tail_segment_size: %s, self._verifycap.size: %s, "
152 "self.segment_size: %s, self._verifycap.needed_shares: %s"
153 % (utcpss, self.tail_segment_size, self._verifycap.size,
154 self.segment_size, self._verifycap.needed_shares))
155 if utcpns != self._verifycap.needed_shares:
156 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
157 "self._verifycap.needed_shares: %s" % (utcpns,
158 self._verifycap.needed_shares))
159 if utcpts != self._verifycap.total_shares:
160 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
161 "self._verifycap.total_shares: %s" % (utcpts,
162 self._verifycap.total_shares))
164 if d.has_key('num_segments'):
165 if d['num_segments'] != self.num_segments:
166 raise BadURIExtension("inconsistent num_segments: size: %s, "
167 "segment_size: %s, computed_num_segments: %s, "
168 "ueb_num_segments: %s" % (self._verifycap.size,
170 self.num_segments, d['num_segments']))
172 if d.has_key('size'):
173 if d['size'] != self._verifycap.size:
174 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
175 (self._verifycap.size, d['size']))
177 if d.has_key('needed_shares'):
178 if d['needed_shares'] != self._verifycap.needed_shares:
179 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
180 "needed shares: %s" % (self._verifycap.total_shares,
183 if d.has_key('total_shares'):
184 if d['total_shares'] != self._verifycap.total_shares:
185 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
186 "total shares: %s" % (self._verifycap.total_shares,
189 # Finally, things that are deprecated and ignored: plaintext_hash,
190 # plaintext_root_hash
191 if d.get('plaintext_hash'):
192 log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
193 "and is no longer used. Ignoring. %s" % (self,))
194 if d.get('plaintext_root_hash'):
195 log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
196 "reasons and is no longer used. Ignoring. %s" % (self,))
201 """Fetch the UEB from bucket, compare its hash to the hash from
202 verifycap, then parse it. Returns a deferred which is called back
203 with self once the fetch is successful, or is erred back if it
205 d = self._readbucketproxy.get_uri_extension()
206 d.addCallback(self._check_integrity)
207 d.addCallback(self._parse_and_validate)
210 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
211 """I am a front-end for a remote storage bucket, responsible for
212 retrieving and validating data from that bucket.
214 My get_block() method is used by BlockDownloaders.
217 def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
218 block_size, share_size):
219 """ share_hash_tree is required to have already been initialized with
220 the root hash (the number-0 hash), using the share_root_hash from the
222 precondition(share_hash_tree[0] is not None, share_hash_tree)
223 prefix = "%d-%s-%s" % (sharenum, bucket,
224 base32.b2a_l(share_hash_tree[0][:8], 60))
225 log.PrefixingLogMixin.__init__(self,
226 facility="tahoe.immutable.download",
228 self.sharenum = sharenum
230 self.share_hash_tree = share_hash_tree
231 self.num_blocks = num_blocks
232 self.block_size = block_size
233 self.share_size = share_size
234 self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
236 def get_all_sharehashes(self):
237 """Retrieve and validate all the share-hash-tree nodes that are
238 included in this share, regardless of whether we need them to
239 validate the share or not. Each share contains a minimal Merkle tree
240 chain, but there is lots of overlap, so usually we'll be using hashes
241 from other shares and not reading every single hash from this share.
242 The Verifier uses this function to read and validate every single
243 hash from this share.
245 Call this (and wait for the Deferred it returns to fire) before
246 calling get_block() for the first time: this lets us check that the
247 share share contains enough hashes to validate its own data, and
248 avoids downloading any share hash twice.
250 I return a Deferred which errbacks upon failure, probably with
253 d = self.bucket.get_share_hashes()
254 def _got_share_hashes(sh):
255 sharehashes = dict(sh)
257 self.share_hash_tree.set_hashes(sharehashes)
258 except IndexError, le:
259 raise BadOrMissingHash(le)
260 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
261 raise BadOrMissingHash(le)
262 d.addCallback(_got_share_hashes)
265 def get_all_blockhashes(self):
266 """Retrieve and validate all the block-hash-tree nodes that are
267 included in this share. Each share contains a full Merkle tree, but
268 we usually only fetch the minimal subset necessary for any particular
269 block. This function fetches everything at once. The Verifier uses
270 this function to validate the block hash tree.
272 Call this (and wait for the Deferred it returns to fire) after
273 calling get_all_sharehashes() and before calling get_block() for the
274 first time: this lets us check that the share contains all block
275 hashes and avoids downloading them multiple times.
277 I return a Deferred which errbacks upon failure, probably with
281 # get_block_hashes(anything) currently always returns everything
282 needed = list(range(len(self.block_hash_tree)))
283 d = self.bucket.get_block_hashes(needed)
284 def _got_block_hashes(blockhashes):
285 if len(blockhashes) < len(self.block_hash_tree):
286 raise BadOrMissingHash()
287 bh = dict(enumerate(blockhashes))
290 self.block_hash_tree.set_hashes(bh)
291 except IndexError, le:
292 raise BadOrMissingHash(le)
293 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
294 raise BadOrMissingHash(le)
295 d.addCallback(_got_block_hashes)
298 def get_all_crypttext_hashes(self, crypttext_hash_tree):
299 """Retrieve and validate all the crypttext-hash-tree nodes that are
300 in this share. Normally we don't look at these at all: the download
301 process fetches them incrementally as needed to validate each segment
302 of ciphertext. But this is a convenient place to give the Verifier a
303 function to validate all of these at once.
305 Call this with a new hashtree object for each share, initialized with
306 the crypttext hash tree root. I return a Deferred which errbacks upon
307 failure, probably with BadOrMissingHash.
310 # get_crypttext_hashes() always returns everything
311 d = self.bucket.get_crypttext_hashes()
312 def _got_crypttext_hashes(hashes):
313 if len(hashes) < len(crypttext_hash_tree):
314 raise BadOrMissingHash()
315 ct_hashes = dict(enumerate(hashes))
317 crypttext_hash_tree.set_hashes(ct_hashes)
318 except IndexError, le:
319 raise BadOrMissingHash(le)
320 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
321 raise BadOrMissingHash(le)
322 d.addCallback(_got_crypttext_hashes)
325 def get_block(self, blocknum):
326 # the first time we use this bucket, we need to fetch enough elements
327 # of the share hash tree to validate it from our share hash up to the
329 if self.share_hash_tree.needed_hashes(self.sharenum):
330 d1 = self.bucket.get_share_hashes()
332 d1 = defer.succeed([])
334 # We might need to grab some elements of our block hash tree, to
335 # validate the requested block up to the share hash.
336 blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
337 # We don't need the root of the block hash tree, as that comes in the
339 blockhashesneeded.discard(0)
340 d2 = self.bucket.get_block_hashes(blockhashesneeded)
342 if blocknum < self.num_blocks-1:
343 thisblocksize = self.block_size
345 thisblocksize = self.share_size % self.block_size
346 if thisblocksize == 0:
347 thisblocksize = self.block_size
348 d3 = self.bucket.get_block_data(blocknum,
349 self.block_size, thisblocksize)
351 dl = deferredutil.gatherResults([d1, d2, d3])
352 dl.addCallback(self._got_data, blocknum)
355 def _got_data(self, results, blocknum):
356 precondition(blocknum < self.num_blocks,
357 self, blocknum, self.num_blocks)
358 sharehashes, blockhashes, blockdata = results
360 sharehashes = dict(sharehashes)
361 except ValueError, le:
362 le.args = tuple(le.args + (sharehashes,))
364 blockhashes = dict(enumerate(blockhashes))
366 candidate_share_hash = None # in case we log it in the except block below
367 blockhash = None # in case we log it in the except block below
370 if self.share_hash_tree.needed_hashes(self.sharenum):
371 # This will raise exception if the values being passed do not
372 # match the root node of self.share_hash_tree.
374 self.share_hash_tree.set_hashes(sharehashes)
375 except IndexError, le:
376 # Weird -- sharehashes contained index numbers outside of
377 # the range that fit into this hash tree.
378 raise BadOrMissingHash(le)
380 # To validate a block we need the root of the block hash tree,
381 # which is also one of the leafs of the share hash tree, and is
382 # called "the share hash".
383 if not self.block_hash_tree[0]: # empty -- no root node yet
384 # Get the share hash from the share hash tree.
385 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
387 # No root node in block_hash_tree and also the share hash
388 # wasn't sent by the server.
389 raise hashtree.NotEnoughHashesError
390 self.block_hash_tree.set_hashes({0: share_hash})
392 if self.block_hash_tree.needed_hashes(blocknum):
393 self.block_hash_tree.set_hashes(blockhashes)
395 blockhash = block_hash(blockdata)
396 self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
397 #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
399 # (self.sharenum, blocknum, len(blockdata),
400 # blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
402 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
403 # log.WEIRD: indicates undetected disk/network error, or more
404 # likely a programming error
405 self.log("hash failure in block=%d, shnum=%d on %s" %
406 (blocknum, self.sharenum, self.bucket))
407 if self.block_hash_tree.needed_hashes(blocknum):
408 self.log(""" failure occurred when checking the block_hash_tree.
409 This suggests that either the block data was bad, or that the
410 block hashes we received along with it were bad.""")
412 self.log(""" the failure probably occurred when checking the
413 share_hash_tree, which suggests that the share hashes we
414 received from the remote peer were bad.""")
415 self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
416 self.log(" block length: %d" % len(blockdata))
417 self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
418 if len(blockdata) < 100:
419 self.log(" block data: %r" % (blockdata,))
421 self.log(" block data start/end: %r .. %r" %
422 (blockdata[:50], blockdata[-50:]))
423 self.log(" share hash tree:\n" + self.share_hash_tree.dump())
424 self.log(" block hash tree:\n" + self.block_hash_tree.dump())
426 for i,h in sorted(sharehashes.items()):
427 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
428 self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
430 for i,h in blockhashes.items():
431 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
432 log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
433 raise BadOrMissingHash(le)
435 # If we made it here, the block is good. If the hash trees didn't
436 # like what they saw, they would have raised a BadHashError, causing
437 # our caller to see a Failure and thus ignore this block (as well as
438 # dropping this bucket).
442 class Checker(log.PrefixingLogMixin):
443 """I query all servers to see if M uniquely-numbered shares are
446 If the verify flag was passed to my constructor, then for each share I
447 download every data block and all metadata from each server and perform a
448 cryptographic integrity check on all of it. If not, I just ask each
449 server 'Which shares do you have?' and believe its answer.
451 In either case, I wait until I have gotten responses from all servers.
452 This fact -- that I wait -- means that an ill-behaved server which fails
453 to answer my questions will make me wait indefinitely. If it is
454 ill-behaved in a way that triggers the underlying foolscap timeouts, then
455 I will wait only as long as those foolscap timeouts, but if it is
456 ill-behaved in a way which placates the foolscap timeouts but still
457 doesn't answer my question then I will wait indefinitely.
459 Before I send any new request to a server, I always ask the 'monitor'
460 object that was passed into my constructor whether this task has been
461 cancelled (by invoking its raise_if_cancelled() method).
463 def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
465 assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
467 prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
468 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
470 self._verifycap = verifycap
472 self._monitor = monitor
473 self._servers = servers
474 self._verify = verify # bool: verify what the servers claim, or not?
475 self._add_lease = add_lease
477 frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
478 self._verifycap.get_storage_index())
479 self.file_renewal_secret = frs
480 fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
481 self._verifycap.get_storage_index())
482 self.file_cancel_secret = fcs
484 def _get_renewal_secret(self, seed):
485 return bucket_renewal_secret_hash(self.file_renewal_secret, seed)
486 def _get_cancel_secret(self, seed):
487 return bucket_cancel_secret_hash(self.file_cancel_secret, seed)
489 def _get_buckets(self, s, storageindex):
490 """Return a deferred that eventually fires with ({sharenum: bucket},
491 serverid, success). In case the server is disconnected or returns a
492 Failure then it fires with ({}, serverid, False) (A server
493 disconnecting or returning a Failure when we ask it for buckets is
494 the same, for our purposes, as a server that says it has none, except
495 that we want to track and report whether or not each server
499 lease_seed = s.get_lease_seed()
501 renew_secret = self._get_renewal_secret(lease_seed)
502 cancel_secret = self._get_cancel_secret(lease_seed)
503 d2 = rref.callRemote("add_lease", storageindex,
504 renew_secret, cancel_secret)
505 d2.addErrback(self._add_lease_failed, s.get_name(), storageindex)
507 d = rref.callRemote("get_buckets", storageindex)
508 def _wrap_results(res):
513 if f.check(DeadReferenceError):
515 self.log("failure from server on 'get_buckets' the REMOTE failure was:",
516 facility="tahoe.immutable.checker",
517 failure=f, level=level, umid="AX7wZQ")
520 d.addCallbacks(_wrap_results, _trap_errs)
523 def _add_lease_failed(self, f, server_name, storage_index):
524 # Older versions of Tahoe didn't handle the add-lease message very
525 # well: <=1.1.0 throws a NameError because it doesn't implement
526 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
527 # (which is most of them, since we send add-lease to everybody,
528 # before we know whether or not they have any shares for us), and
529 # 1.2.0 throws KeyError even on known buckets due to an internal bug
530 # in the latency-measuring code.
532 # we want to ignore the known-harmless errors and log the others. In
533 # particular we want to log any local errors caused by coding
536 if f.check(DeadReferenceError):
538 if f.check(RemoteException):
539 if f.value.failure.check(KeyError, IndexError, NameError):
540 # this may ignore a bit too much, but that only hurts us
543 self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
545 f_value=str(f.value),
547 level=log.WEIRD, umid="atbAxw")
549 # local errors are cause for alarm
551 format="local error in add_lease to [%(name)s]: %(f_value)s",
553 f_value=str(f.value),
554 level=log.WEIRD, umid="hEGuQg")
557 def _download_and_verify(self, server, sharenum, bucket):
558 """Start an attempt to download and verify every block in this bucket
559 and return a deferred that will eventually fire once the attempt
562 If you download and verify every block then fire with (True,
563 sharenum, None), else if the share data couldn't be parsed because it
564 was of an unknown version number fire with (False, sharenum,
565 'incompatible'), else if any of the blocks were invalid, fire with
566 (False, sharenum, 'corrupt'), else if the server disconnected (False,
567 sharenum, 'disconnect'), else if the server returned a Failure during
568 the process fire with (False, sharenum, 'failure').
570 If there is an internal error such as an uncaught exception in this
571 code, then the deferred will errback, but if there is a remote error
572 such as the server failing or the returned data being incorrect then
573 it will not errback -- it will fire normally with the indicated
576 vcap = self._verifycap
577 b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index())
578 veup = ValidatedExtendedURIProxy(b, vcap)
582 share_hash_tree = IncompleteHashTree(vcap.total_shares)
583 share_hash_tree.set_hashes({0: vup.share_root_hash})
585 vrbp = ValidatedReadBucketProxy(sharenum, b,
591 # note: normal download doesn't use get_all_sharehashes(),
592 # because it gets more data than necessary. We've discussed the
593 # security properties of having verification and download look
594 # identical (so the server couldn't, say, provide good responses
595 # for one and not the other), but I think that full verification
596 # is more important than defending against inconsistent server
597 # behavior. Besides, they can't pass the verifier without storing
598 # all the data, so there's not so much to be gained by behaving
600 d = vrbp.get_all_sharehashes()
601 # we fill share_hash_tree before fetching any blocks, so the
602 # block fetches won't send redundant share-hash-tree requests, to
603 # speed things up. Then we fetch+validate all the blockhashes.
604 d.addCallback(lambda ign: vrbp.get_all_blockhashes())
606 cht = IncompleteHashTree(vup.num_segments)
607 cht.set_hashes({0: vup.crypttext_root_hash})
608 d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
610 d.addCallback(lambda ign: vrbp)
612 d.addCallback(_got_ueb)
614 def _discard_result(r):
615 assert isinstance(r, str), r
619 def _get_blocks(vrbp):
620 def _get_block(ign, blocknum):
621 db = vrbp.get_block(blocknum)
622 db.addCallback(_discard_result)
625 dbs = defer.succeed(None)
626 for blocknum in range(veup.num_segments):
627 dbs.addCallback(_get_block, blocknum)
629 # The Deferred we return will fire after every block of this
630 # share has been downloaded and verified successfully, or else it
631 # will errback as soon as the first error is observed.
634 d.addCallback(_get_blocks)
636 # if none of those errbacked, the blocks (and the hashes above them)
639 return (True, sharenum, None)
640 d.addCallback(_all_good)
642 # but if anything fails, we'll land here
644 # We didn't succeed at fetching and verifying all the blocks of
645 # this share. Handle each reason for failure differently.
647 if f.check(DeadReferenceError):
648 return (False, sharenum, 'disconnect')
649 elif f.check(RemoteException):
650 return (False, sharenum, 'failure')
651 elif f.check(layout.ShareVersionIncompatible):
652 return (False, sharenum, 'incompatible')
653 elif f.check(layout.LayoutInvalid,
654 layout.RidiculouslyLargeURIExtensionBlock,
656 BadURIExtensionHashValue):
657 return (False, sharenum, 'corrupt')
659 # if it wasn't one of those reasons, re-raise the error
665 def _verify_server_shares(self, s):
666 """ Return a deferred which eventually fires with a tuple of
667 (set(sharenum), server, set(corruptsharenum),
668 set(incompatiblesharenum), success) showing all the shares verified
669 to be served by this server, and all the corrupt shares served by the
670 server, and all the incompatible shares served by the server. In case
671 the server is disconnected or returns a Failure then it fires with
672 the last element False.
674 A server disconnecting or returning a failure when we ask it for
675 shares is the same, for our purposes, as a server that says it has
676 none or offers invalid ones, except that we want to track and report
677 the server's behavior. Similarly, the presence of corrupt shares is
678 mainly of use for diagnostics -- you can typically treat it as just
679 like being no share at all by just observing its absence from the
680 verified shares dict and ignoring its presence in the corrupt shares
683 The 'success' argument means whether the server responded to *any*
684 queries during this process, so if it responded to some queries and
685 then disconnected and ceased responding, or returned a failure, it is
686 still marked with the True flag for 'success'.
688 d = self._get_buckets(s, self._verifycap.get_storage_index())
690 def _got_buckets(result):
691 bucketdict, success = result
694 for (sharenum, bucket) in bucketdict.items():
695 d = self._download_and_verify(s, sharenum, bucket)
698 dl = deferredutil.gatherResults(shareverds)
700 def collect(results):
704 for succ, sharenum, whynot in results:
706 verified.add(sharenum)
708 if whynot == 'corrupt':
709 corrupt.add(sharenum)
710 elif whynot == 'incompatible':
711 incompatible.add(sharenum)
712 return (verified, s, corrupt, incompatible, success)
714 dl.addCallback(collect)
718 f.trap(RemoteException, DeadReferenceError)
719 return (set(), s, set(), set(), False)
721 d.addCallbacks(_got_buckets, _err)
724 def _check_server_shares(self, s):
725 """Return a deferred which eventually fires with a tuple of
726 (set(sharenum), server, set(), set(), responded) showing all the
727 shares claimed to be served by this server. In case the server is
728 disconnected then it fires with (set(), server, set(), set(), False)
729 (a server disconnecting when we ask it for buckets is the same, for
730 our purposes, as a server that says it has none, except that we want
731 to track and report whether or not each server responded.)"""
732 def _curry_empty_corrupted(res):
733 buckets, responded = res
734 return (set(buckets), s, set(), set(), responded)
735 d = self._get_buckets(s, self._verifycap.get_storage_index())
736 d.addCallback(_curry_empty_corrupted)
739 def _format_results(self, results):
740 SI = self._verifycap.get_storage_index()
742 verifiedshares = dictutil.DictOfSets() # {sharenum: set(server)}
743 servers = {} # {server: set(sharenums)}
744 corruptshare_locators = [] # (server, storageindex, sharenum)
745 incompatibleshare_locators = [] # (server, storageindex, sharenum)
746 servers_responding = set() # server
748 for verified, server, corrupt, incompatible, responded in results:
749 servers.setdefault(server, set()).update(verified)
750 for sharenum in verified:
751 verifiedshares.setdefault(sharenum, set()).add(server)
752 for sharenum in corrupt:
753 corruptshare_locators.append((server, SI, sharenum))
754 for sharenum in incompatible:
755 incompatibleshare_locators.append((server, SI, sharenum))
757 servers_responding.add(server)
759 good_share_hosts = len([s for s in servers.keys() if servers[s]])
761 assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
762 if len(verifiedshares) == self._verifycap.total_shares:
767 summary = ("Not Healthy: %d shares (enc %d-of-%d)" %
768 (len(verifiedshares),
769 self._verifycap.needed_shares,
770 self._verifycap.total_shares))
771 if len(verifiedshares) >= self._verifycap.needed_shares:
778 # The file needs rebalancing if the set of servers that have at least
779 # one share is less than the number of uniquely-numbered shares
781 needs_rebalancing = bool(good_share_hosts < len(verifiedshares))
783 cr = CheckResults(self._verifycap, SI,
784 healthy=healthy, recoverable=bool(recoverable),
785 needs_rebalancing=needs_rebalancing,
786 count_shares_needed=self._verifycap.needed_shares,
787 count_shares_expected=self._verifycap.total_shares,
788 count_shares_good=len(verifiedshares),
789 count_good_share_hosts=good_share_hosts,
790 count_recoverable_versions=recoverable,
791 count_unrecoverable_versions=unrecoverable,
792 servers_responding=list(servers_responding),
793 sharemap=verifiedshares,
794 count_wrong_shares=0, # no such thing, for immutable
795 list_corrupt_shares=corruptshare_locators,
796 count_corrupt_shares=len(corruptshare_locators),
797 list_incompatible_shares=incompatibleshare_locators,
798 count_incompatible_shares=len(incompatibleshare_locators),
809 for s in self._servers:
810 ds.append(self._verify_server_shares(s))
812 for s in self._servers:
813 ds.append(self._check_server_shares(s))
815 return deferredutil.gatherResults(ds).addCallback(self._format_results)