1 from zope.interface import implements
2 from twisted.internet import defer
3 from foolscap.api import DeadReferenceError, RemoteException
4 from allmydata import hashtree, codec, uri
5 from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
6 from allmydata.hashtree import IncompleteHashTree
7 from allmydata.check_results import CheckResults
8 from allmydata.uri import CHKFileVerifierURI
9 from allmydata.util.assertutil import precondition
10 from allmydata.util import base32, deferredutil, dictutil, log, mathutil
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12 file_cancel_secret_hash, bucket_renewal_secret_hash, \
13 bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
16 from allmydata.immutable import layout
18 class IntegrityCheckReject(Exception):
20 class BadURIExtension(IntegrityCheckReject):
22 class BadURIExtensionHashValue(IntegrityCheckReject):
24 class BadOrMissingHash(IntegrityCheckReject):
26 class UnsupportedErasureCodec(BadURIExtension):
29 class ValidatedExtendedURIProxy:
30 implements(IValidatedThingProxy)
31 """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
32 responsible for retrieving and validating the elements from the UEB."""
34 def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
35 # fetch_failures is for debugging -- see test_encode.py
36 self._fetch_failures = fetch_failures
37 self._readbucketproxy = readbucketproxy
38 precondition(IVerifierURI.providedBy(verifycap), verifycap)
39 self._verifycap = verifycap
42 self.segment_size = None
43 self.crypttext_root_hash = None
44 self.share_root_hash = None
47 self.block_size = None
48 self.share_size = None
49 self.num_segments = None
50 self.tail_data_size = None
51 self.tail_segment_size = None
54 self.crypttext_hash = None
57 return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
59 def _check_integrity(self, data):
60 h = uri_extension_hash(data)
61 if h != self._verifycap.uri_extension_hash:
62 msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
63 (self._readbucketproxy,
64 base32.b2a(self._verifycap.uri_extension_hash),
66 if self._fetch_failures is not None:
67 self._fetch_failures["uri_extension"] += 1
68 raise BadURIExtensionHashValue(msg)
72 def _parse_and_validate(self, data):
73 self.share_size = mathutil.div_ceil(self._verifycap.size,
74 self._verifycap.needed_shares)
76 d = uri.unpack_extension(data)
78 # There are several kinds of things that can be found in a UEB.
79 # First, things that we really need to learn from the UEB in order to
80 # do this download. Next: things which are optional but not redundant
81 # -- if they are present in the UEB they will get used. Next, things
82 # that are optional and redundant. These things are required to be
83 # consistent: they don't have to be in the UEB, but if they are in
84 # the UEB then they will be checked for consistency with the
85 # already-known facts, and if they are inconsistent then an exception
86 # will be raised. These things aren't actually used -- they are just
87 # tested for consistency and ignored. Finally: things which are
88 # deprecated -- they ought not be in the UEB at all, and if they are
89 # present then a warning will be logged but they are otherwise
92 # First, things that we really need to learn from the UEB:
93 # segment_size, crypttext_root_hash, and share_root_hash.
94 self.segment_size = d['segment_size']
96 self.block_size = mathutil.div_ceil(self.segment_size,
97 self._verifycap.needed_shares)
98 self.num_segments = mathutil.div_ceil(self._verifycap.size,
101 self.tail_data_size = self._verifycap.size % self.segment_size
102 if not self.tail_data_size:
103 self.tail_data_size = self.segment_size
104 # padding for erasure code
105 self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
106 self._verifycap.needed_shares)
108 # Ciphertext hash tree root is mandatory, so that there is at most
109 # one ciphertext that matches this read-cap or verify-cap. The
110 # integrity check on the shares is not sufficient to prevent the
111 # original encoder from creating some shares of file A and other
113 self.crypttext_root_hash = d['crypttext_root_hash']
115 self.share_root_hash = d['share_root_hash']
118 # Next: things that are optional and not redundant: crypttext_hash
119 if d.has_key('crypttext_hash'):
120 self.crypttext_hash = d['crypttext_hash']
121 if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
122 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
125 # Next: things that are optional, redundant, and required to be
126 # consistent: codec_name, codec_params, tail_codec_params,
127 # num_segments, size, needed_shares, total_shares
128 if d.has_key('codec_name'):
129 if d['codec_name'] != "crs":
130 raise UnsupportedErasureCodec(d['codec_name'])
132 if d.has_key('codec_params'):
133 ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
134 if ucpss != self.segment_size:
135 raise BadURIExtension("inconsistent erasure code params: "
136 "ucpss: %s != self.segment_size: %s" %
137 (ucpss, self.segment_size))
138 if ucpns != self._verifycap.needed_shares:
139 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
140 "self._verifycap.needed_shares: %s" %
141 (ucpns, self._verifycap.needed_shares))
142 if ucpts != self._verifycap.total_shares:
143 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
144 "self._verifycap.total_shares: %s" %
145 (ucpts, self._verifycap.total_shares))
147 if d.has_key('tail_codec_params'):
148 utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
149 if utcpss != self.tail_segment_size:
150 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
151 "self.tail_segment_size: %s, self._verifycap.size: %s, "
152 "self.segment_size: %s, self._verifycap.needed_shares: %s"
153 % (utcpss, self.tail_segment_size, self._verifycap.size,
154 self.segment_size, self._verifycap.needed_shares))
155 if utcpns != self._verifycap.needed_shares:
156 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
157 "self._verifycap.needed_shares: %s" % (utcpns,
158 self._verifycap.needed_shares))
159 if utcpts != self._verifycap.total_shares:
160 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
161 "self._verifycap.total_shares: %s" % (utcpts,
162 self._verifycap.total_shares))
164 if d.has_key('num_segments'):
165 if d['num_segments'] != self.num_segments:
166 raise BadURIExtension("inconsistent num_segments: size: %s, "
167 "segment_size: %s, computed_num_segments: %s, "
168 "ueb_num_segments: %s" % (self._verifycap.size,
170 self.num_segments, d['num_segments']))
172 if d.has_key('size'):
173 if d['size'] != self._verifycap.size:
174 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
175 (self._verifycap.size, d['size']))
177 if d.has_key('needed_shares'):
178 if d['needed_shares'] != self._verifycap.needed_shares:
179 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
180 "needed shares: %s" % (self._verifycap.total_shares,
183 if d.has_key('total_shares'):
184 if d['total_shares'] != self._verifycap.total_shares:
185 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
186 "total shares: %s" % (self._verifycap.total_shares,
189 # Finally, things that are deprecated and ignored: plaintext_hash,
190 # plaintext_root_hash
191 if d.get('plaintext_hash'):
192 log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
193 "and is no longer used. Ignoring. %s" % (self,))
194 if d.get('plaintext_root_hash'):
195 log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
196 "reasons and is no longer used. Ignoring. %s" % (self,))
201 """Fetch the UEB from bucket, compare its hash to the hash from
202 verifycap, then parse it. Returns a deferred which is called back
203 with self once the fetch is successful, or is erred back if it
205 d = self._readbucketproxy.get_uri_extension()
206 d.addCallback(self._check_integrity)
207 d.addCallback(self._parse_and_validate)
210 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
211 """I am a front-end for a remote storage bucket, responsible for
212 retrieving and validating data from that bucket.
214 My get_block() method is used by BlockDownloaders.
217 def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
218 block_size, share_size):
219 """ share_hash_tree is required to have already been initialized with
220 the root hash (the number-0 hash), using the share_root_hash from the
222 precondition(share_hash_tree[0] is not None, share_hash_tree)
223 prefix = "%d-%s-%s" % (sharenum, bucket,
224 base32.b2a_l(share_hash_tree[0][:8], 60))
225 log.PrefixingLogMixin.__init__(self,
226 facility="tahoe.immutable.download",
228 self.sharenum = sharenum
230 self.share_hash_tree = share_hash_tree
231 self.num_blocks = num_blocks
232 self.block_size = block_size
233 self.share_size = share_size
234 self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
236 def get_all_sharehashes(self):
237 """Retrieve and validate all the share-hash-tree nodes that are
238 included in this share, regardless of whether we need them to
239 validate the share or not. Each share contains a minimal Merkle tree
240 chain, but there is lots of overlap, so usually we'll be using hashes
241 from other shares and not reading every single hash from this share.
242 The Verifier uses this function to read and validate every single
243 hash from this share.
245 Call this (and wait for the Deferred it returns to fire) before
246 calling get_block() for the first time: this lets us check that the
247 share share contains enough hashes to validate its own data, and
248 avoids downloading any share hash twice.
250 I return a Deferred which errbacks upon failure, probably with
253 d = self.bucket.get_share_hashes()
254 def _got_share_hashes(sh):
255 sharehashes = dict(sh)
257 self.share_hash_tree.set_hashes(sharehashes)
258 except IndexError, le:
259 raise BadOrMissingHash(le)
260 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
261 raise BadOrMissingHash(le)
262 d.addCallback(_got_share_hashes)
265 def get_all_blockhashes(self):
266 """Retrieve and validate all the block-hash-tree nodes that are
267 included in this share. Each share contains a full Merkle tree, but
268 we usually only fetch the minimal subset necessary for any particular
269 block. This function fetches everything at once. The Verifier uses
270 this function to validate the block hash tree.
272 Call this (and wait for the Deferred it returns to fire) after
273 calling get_all_sharehashes() and before calling get_block() for the
274 first time: this lets us check that the share contains all block
275 hashes and avoids downloading them multiple times.
277 I return a Deferred which errbacks upon failure, probably with
281 # get_block_hashes(anything) currently always returns everything
282 needed = list(range(len(self.block_hash_tree)))
283 d = self.bucket.get_block_hashes(needed)
284 def _got_block_hashes(blockhashes):
285 if len(blockhashes) < len(self.block_hash_tree):
286 raise BadOrMissingHash()
287 bh = dict(enumerate(blockhashes))
290 self.block_hash_tree.set_hashes(bh)
291 except IndexError, le:
292 raise BadOrMissingHash(le)
293 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
294 raise BadOrMissingHash(le)
295 d.addCallback(_got_block_hashes)
298 def get_all_crypttext_hashes(self, crypttext_hash_tree):
299 """Retrieve and validate all the crypttext-hash-tree nodes that are
300 in this share. Normally we don't look at these at all: the download
301 process fetches them incrementally as needed to validate each segment
302 of ciphertext. But this is a convenient place to give the Verifier a
303 function to validate all of these at once.
305 Call this with a new hashtree object for each share, initialized with
306 the crypttext hash tree root. I return a Deferred which errbacks upon
307 failure, probably with BadOrMissingHash.
310 # get_crypttext_hashes() always returns everything
311 d = self.bucket.get_crypttext_hashes()
312 def _got_crypttext_hashes(hashes):
313 if len(hashes) < len(crypttext_hash_tree):
314 raise BadOrMissingHash()
315 ct_hashes = dict(enumerate(hashes))
317 crypttext_hash_tree.set_hashes(ct_hashes)
318 except IndexError, le:
319 raise BadOrMissingHash(le)
320 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
321 raise BadOrMissingHash(le)
322 d.addCallback(_got_crypttext_hashes)
325 def get_block(self, blocknum):
326 # the first time we use this bucket, we need to fetch enough elements
327 # of the share hash tree to validate it from our share hash up to the
329 if self.share_hash_tree.needed_hashes(self.sharenum):
330 d1 = self.bucket.get_share_hashes()
332 d1 = defer.succeed([])
334 # We might need to grab some elements of our block hash tree, to
335 # validate the requested block up to the share hash.
336 blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
337 # We don't need the root of the block hash tree, as that comes in the
339 blockhashesneeded.discard(0)
340 d2 = self.bucket.get_block_hashes(blockhashesneeded)
342 if blocknum < self.num_blocks-1:
343 thisblocksize = self.block_size
345 thisblocksize = self.share_size % self.block_size
346 if thisblocksize == 0:
347 thisblocksize = self.block_size
348 d3 = self.bucket.get_block_data(blocknum,
349 self.block_size, thisblocksize)
351 dl = deferredutil.gatherResults([d1, d2, d3])
352 dl.addCallback(self._got_data, blocknum)
355 def _got_data(self, results, blocknum):
356 precondition(blocknum < self.num_blocks,
357 self, blocknum, self.num_blocks)
358 sharehashes, blockhashes, blockdata = results
360 sharehashes = dict(sharehashes)
361 except ValueError, le:
362 le.args = tuple(le.args + (sharehashes,))
364 blockhashes = dict(enumerate(blockhashes))
366 candidate_share_hash = None # in case we log it in the except block below
367 blockhash = None # in case we log it in the except block below
370 if self.share_hash_tree.needed_hashes(self.sharenum):
371 # This will raise exception if the values being passed do not
372 # match the root node of self.share_hash_tree.
374 self.share_hash_tree.set_hashes(sharehashes)
375 except IndexError, le:
376 # Weird -- sharehashes contained index numbers outside of
377 # the range that fit into this hash tree.
378 raise BadOrMissingHash(le)
380 # To validate a block we need the root of the block hash tree,
381 # which is also one of the leafs of the share hash tree, and is
382 # called "the share hash".
383 if not self.block_hash_tree[0]: # empty -- no root node yet
384 # Get the share hash from the share hash tree.
385 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
387 # No root node in block_hash_tree and also the share hash
388 # wasn't sent by the server.
389 raise hashtree.NotEnoughHashesError
390 self.block_hash_tree.set_hashes({0: share_hash})
392 if self.block_hash_tree.needed_hashes(blocknum):
393 self.block_hash_tree.set_hashes(blockhashes)
395 blockhash = block_hash(blockdata)
396 self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
397 #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
399 # (self.sharenum, blocknum, len(blockdata),
400 # blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
402 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
403 # log.WEIRD: indicates undetected disk/network error, or more
404 # likely a programming error
405 self.log("hash failure in block=%d, shnum=%d on %s" %
406 (blocknum, self.sharenum, self.bucket))
407 if self.block_hash_tree.needed_hashes(blocknum):
408 self.log(""" failure occurred when checking the block_hash_tree.
409 This suggests that either the block data was bad, or that the
410 block hashes we received along with it were bad.""")
412 self.log(""" the failure probably occurred when checking the
413 share_hash_tree, which suggests that the share hashes we
414 received from the remote peer were bad.""")
415 self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
416 self.log(" block length: %d" % len(blockdata))
417 self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
418 if len(blockdata) < 100:
419 self.log(" block data: %r" % (blockdata,))
421 self.log(" block data start/end: %r .. %r" %
422 (blockdata[:50], blockdata[-50:]))
423 self.log(" share hash tree:\n" + self.share_hash_tree.dump())
424 self.log(" block hash tree:\n" + self.block_hash_tree.dump())
426 for i,h in sorted(sharehashes.items()):
427 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
428 self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
430 for i,h in blockhashes.items():
431 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
432 log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
433 raise BadOrMissingHash(le)
435 # If we made it here, the block is good. If the hash trees didn't
436 # like what they saw, they would have raised a BadHashError, causing
437 # our caller to see a Failure and thus ignore this block (as well as
438 # dropping this bucket).
442 class Checker(log.PrefixingLogMixin):
443 """I query all servers to see if M uniquely-numbered shares are
446 If the verify flag was passed to my constructor, then for each share I
447 download every data block and all metadata from each server and perform a
448 cryptographic integrity check on all of it. If not, I just ask each
449 server 'Which shares do you have?' and believe its answer.
451 In either case, I wait until I have gotten responses from all servers.
452 This fact -- that I wait -- means that an ill-behaved server which fails
453 to answer my questions will make me wait indefinitely. If it is
454 ill-behaved in a way that triggers the underlying foolscap timeouts, then
455 I will wait only as long as those foolscap timeouts, but if it is
456 ill-behaved in a way which placates the foolscap timeouts but still
457 doesn't answer my question then I will wait indefinitely.
459 Before I send any new request to a server, I always ask the 'monitor'
460 object that was passed into my constructor whether this task has been
461 cancelled (by invoking its raise_if_cancelled() method).
463 def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
465 assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
467 prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
468 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
470 self._verifycap = verifycap
472 self._monitor = monitor
473 self._servers = servers
474 self._verify = verify # bool: verify what the servers claim, or not?
475 self._add_lease = add_lease
477 frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
478 self._verifycap.get_storage_index())
479 self.file_renewal_secret = frs
480 fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
481 self._verifycap.get_storage_index())
482 self.file_cancel_secret = fcs
484 def _get_renewal_secret(self, seed):
485 return bucket_renewal_secret_hash(self.file_renewal_secret, seed)
486 def _get_cancel_secret(self, seed):
487 return bucket_cancel_secret_hash(self.file_cancel_secret, seed)
489 def _get_buckets(self, s, storageindex):
490 """Return a deferred that eventually fires with ({sharenum: bucket},
491 serverid, success). In case the server is disconnected or returns a
492 Failure then it fires with ({}, serverid, False) (A server
493 disconnecting or returning a Failure when we ask it for buckets is
494 the same, for our purposes, as a server that says it has none, except
495 that we want to track and report whether or not each server
499 lease_seed = s.get_lease_seed()
501 renew_secret = self._get_renewal_secret(lease_seed)
502 cancel_secret = self._get_cancel_secret(lease_seed)
503 d2 = rref.callRemote("add_lease", storageindex,
504 renew_secret, cancel_secret)
505 d2.addErrback(self._add_lease_failed, s.get_name(), storageindex)
507 d = rref.callRemote("get_buckets", storageindex)
508 def _wrap_results(res):
513 if f.check(DeadReferenceError):
515 self.log("failure from server on 'get_buckets' the REMOTE failure was:",
516 facility="tahoe.immutable.checker",
517 failure=f, level=level, umid="AX7wZQ")
520 d.addCallbacks(_wrap_results, _trap_errs)
523 def _add_lease_failed(self, f, server_name, storage_index):
524 # Older versions of Tahoe didn't handle the add-lease message very
525 # well: <=1.1.0 throws a NameError because it doesn't implement
526 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
527 # (which is most of them, since we send add-lease to everybody,
528 # before we know whether or not they have any shares for us), and
529 # 1.2.0 throws KeyError even on known buckets due to an internal bug
530 # in the latency-measuring code.
532 # we want to ignore the known-harmless errors and log the others. In
533 # particular we want to log any local errors caused by coding
536 if f.check(DeadReferenceError):
538 if f.check(RemoteException):
539 if f.value.failure.check(KeyError, IndexError, NameError):
540 # this may ignore a bit too much, but that only hurts us
543 self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
545 f_value=str(f.value),
547 level=log.WEIRD, umid="atbAxw")
549 # local errors are cause for alarm
551 format="local error in add_lease to [%(name)s]: %(f_value)s",
553 f_value=str(f.value),
554 level=log.WEIRD, umid="hEGuQg")
557 def _download_and_verify(self, server, sharenum, bucket):
558 """Start an attempt to download and verify every block in this bucket
559 and return a deferred that will eventually fire once the attempt
562 If you download and verify every block then fire with (True,
563 sharenum, None), else if the share data couldn't be parsed because it
564 was of an unknown version number fire with (False, sharenum,
565 'incompatible'), else if any of the blocks were invalid, fire with
566 (False, sharenum, 'corrupt'), else if the server disconnected (False,
567 sharenum, 'disconnect'), else if the server returned a Failure during
568 the process fire with (False, sharenum, 'failure').
570 If there is an internal error such as an uncaught exception in this
571 code, then the deferred will errback, but if there is a remote error
572 such as the server failing or the returned data being incorrect then
573 it will not errback -- it will fire normally with the indicated
576 vcap = self._verifycap
577 b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index())
578 veup = ValidatedExtendedURIProxy(b, vcap)
582 share_hash_tree = IncompleteHashTree(vcap.total_shares)
583 share_hash_tree.set_hashes({0: vup.share_root_hash})
585 vrbp = ValidatedReadBucketProxy(sharenum, b,
591 # note: normal download doesn't use get_all_sharehashes(),
592 # because it gets more data than necessary. We've discussed the
593 # security properties of having verification and download look
594 # identical (so the server couldn't, say, provide good responses
595 # for one and not the other), but I think that full verification
596 # is more important than defending against inconsistent server
597 # behavior. Besides, they can't pass the verifier without storing
598 # all the data, so there's not so much to be gained by behaving
600 d = vrbp.get_all_sharehashes()
601 # we fill share_hash_tree before fetching any blocks, so the
602 # block fetches won't send redundant share-hash-tree requests, to
603 # speed things up. Then we fetch+validate all the blockhashes.
604 d.addCallback(lambda ign: vrbp.get_all_blockhashes())
606 cht = IncompleteHashTree(vup.num_segments)
607 cht.set_hashes({0: vup.crypttext_root_hash})
608 d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
610 d.addCallback(lambda ign: vrbp)
612 d.addCallback(_got_ueb)
614 def _discard_result(r):
615 assert isinstance(r, str), r
618 def _get_blocks(vrbp):
619 def _get_block(ign, blocknum):
620 db = vrbp.get_block(blocknum)
621 db.addCallback(_discard_result)
623 dbs = defer.succeed(None)
624 for blocknum in range(veup.num_segments):
625 dbs.addCallback(_get_block, blocknum)
626 # The Deferred we return will fire after every block of this
627 # share has been downloaded and verified successfully, or else it
628 # will errback as soon as the first error is observed.
631 d.addCallback(_get_blocks)
633 # if none of those errbacked, the blocks (and the hashes above them)
636 return (True, sharenum, None)
637 d.addCallback(_all_good)
639 # but if anything fails, we'll land here
641 # We didn't succeed at fetching and verifying all the blocks of
642 # this share. Handle each reason for failure differently.
644 if f.check(DeadReferenceError):
645 return (False, sharenum, 'disconnect')
646 elif f.check(RemoteException):
647 return (False, sharenum, 'failure')
648 elif f.check(layout.ShareVersionIncompatible):
649 return (False, sharenum, 'incompatible')
650 elif f.check(layout.LayoutInvalid,
651 layout.RidiculouslyLargeURIExtensionBlock,
653 BadURIExtensionHashValue):
654 return (False, sharenum, 'corrupt')
656 # if it wasn't one of those reasons, re-raise the error
662 def _verify_server_shares(self, s):
663 """ Return a deferred which eventually fires with a tuple of
664 (set(sharenum), server, set(corruptsharenum),
665 set(incompatiblesharenum), success) showing all the shares verified
666 to be served by this server, and all the corrupt shares served by the
667 server, and all the incompatible shares served by the server. In case
668 the server is disconnected or returns a Failure then it fires with
669 the last element False.
671 A server disconnecting or returning a failure when we ask it for
672 shares is the same, for our purposes, as a server that says it has
673 none or offers invalid ones, except that we want to track and report
674 the server's behavior. Similarly, the presence of corrupt shares is
675 mainly of use for diagnostics -- you can typically treat it as just
676 like being no share at all by just observing its absence from the
677 verified shares dict and ignoring its presence in the corrupt shares
680 The 'success' argument means whether the server responded to *any*
681 queries during this process, so if it responded to some queries and
682 then disconnected and ceased responding, or returned a failure, it is
683 still marked with the True flag for 'success'.
685 d = self._get_buckets(s, self._verifycap.get_storage_index())
687 def _got_buckets(result):
688 bucketdict, success = result
691 for (sharenum, bucket) in bucketdict.items():
692 d = self._download_and_verify(s, sharenum, bucket)
695 dl = deferredutil.gatherResults(shareverds)
697 def collect(results):
701 for succ, sharenum, whynot in results:
703 verified.add(sharenum)
705 if whynot == 'corrupt':
706 corrupt.add(sharenum)
707 elif whynot == 'incompatible':
708 incompatible.add(sharenum)
709 return (verified, s, corrupt, incompatible, success)
711 dl.addCallback(collect)
715 f.trap(RemoteException, DeadReferenceError)
716 return (set(), s, set(), set(), False)
718 d.addCallbacks(_got_buckets, _err)
721 def _check_server_shares(self, s):
722 """Return a deferred which eventually fires with a tuple of
723 (set(sharenum), server, set(), set(), responded) showing all the
724 shares claimed to be served by this server. In case the server is
725 disconnected then it fires with (set(), server, set(), set(), False)
726 (a server disconnecting when we ask it for buckets is the same, for
727 our purposes, as a server that says it has none, except that we want
728 to track and report whether or not each server responded.)"""
729 def _curry_empty_corrupted(res):
730 buckets, responded = res
731 return (set(buckets), s, set(), set(), responded)
732 d = self._get_buckets(s, self._verifycap.get_storage_index())
733 d.addCallback(_curry_empty_corrupted)
736 def _format_results(self, results):
737 cr = CheckResults(self._verifycap, self._verifycap.get_storage_index())
739 d['count-shares-needed'] = self._verifycap.needed_shares
740 d['count-shares-expected'] = self._verifycap.total_shares
742 verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
743 servers = {} # {serverid: set(sharenums)}
744 corruptsharelocators = [] # (serverid, storageindex, sharenum)
745 incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
747 for theseverifiedshares, thisserver, thesecorruptshares, theseincompatibleshares, thisresponded in results:
748 thisserverid = thisserver.get_serverid()
749 servers.setdefault(thisserverid, set()).update(theseverifiedshares)
750 for sharenum in theseverifiedshares:
751 verifiedshares.setdefault(sharenum, set()).add(thisserverid)
752 for sharenum in thesecorruptshares:
753 corruptsharelocators.append((thisserverid, self._verifycap.get_storage_index(), sharenum))
754 for sharenum in theseincompatibleshares:
755 incompatiblesharelocators.append((thisserverid, self._verifycap.get_storage_index(), sharenum))
757 d['count-shares-good'] = len(verifiedshares)
758 d['count-good-share-hosts'] = len([s for s in servers.keys() if servers[s]])
760 assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
761 if len(verifiedshares) == self._verifycap.total_shares:
763 cr.set_summary("Healthy")
765 cr.set_healthy(False)
766 cr.set_summary("Not Healthy: %d shares (enc %d-of-%d)" %
767 (len(verifiedshares),
768 self._verifycap.needed_shares,
769 self._verifycap.total_shares))
770 if len(verifiedshares) >= self._verifycap.needed_shares:
771 cr.set_recoverable(True)
772 d['count-recoverable-versions'] = 1
773 d['count-unrecoverable-versions'] = 0
775 cr.set_recoverable(False)
776 d['count-recoverable-versions'] = 0
777 d['count-unrecoverable-versions'] = 1
779 d['servers-responding'] = list(servers)
780 d['sharemap'] = verifiedshares
781 # no such thing as wrong shares of an immutable file
782 d['count-wrong-shares'] = 0
783 d['list-corrupt-shares'] = corruptsharelocators
784 d['count-corrupt-shares'] = len(corruptsharelocators)
785 d['list-incompatible-shares'] = incompatiblesharelocators
786 d['count-incompatible-shares'] = len(incompatiblesharelocators)
789 # The file needs rebalancing if the set of servers that have at least
790 # one share is less than the number of uniquely-numbered shares
792 cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
801 for s in self._servers:
802 ds.append(self._verify_server_shares(s))
804 for s in self._servers:
805 ds.append(self._check_server_shares(s))
807 return deferredutil.gatherResults(ds).addCallback(self._format_results)