]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/checker.py
37d6a5ea4de07bc2e105a16738ac596ad0343699
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / checker.py
1 from zope.interface import implements
2 from twisted.internet import defer
3 from foolscap.api import DeadReferenceError, RemoteException
4 from allmydata import hashtree, codec, uri
5 from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
6 from allmydata.hashtree import IncompleteHashTree
7 from allmydata.check_results import CheckResults
8 from allmydata.uri import CHKFileVerifierURI
9 from allmydata.util.assertutil import precondition
10 from allmydata.util import base32, deferredutil, dictutil, log, mathutil
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12      file_cancel_secret_hash, bucket_renewal_secret_hash, \
13      bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
14      block_hash
15
16 from allmydata.immutable import layout
17
18 class IntegrityCheckReject(Exception):
19     pass
20 class BadURIExtension(IntegrityCheckReject):
21     pass
22 class BadURIExtensionHashValue(IntegrityCheckReject):
23     pass
24 class BadOrMissingHash(IntegrityCheckReject):
25     pass
26 class UnsupportedErasureCodec(BadURIExtension):
27     pass
28
29 class ValidatedExtendedURIProxy:
30     implements(IValidatedThingProxy)
31     """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
32     responsible for retrieving and validating the elements from the UEB."""
33
34     def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
35         # fetch_failures is for debugging -- see test_encode.py
36         self._fetch_failures = fetch_failures
37         self._readbucketproxy = readbucketproxy
38         precondition(IVerifierURI.providedBy(verifycap), verifycap)
39         self._verifycap = verifycap
40
41         # required
42         self.segment_size = None
43         self.crypttext_root_hash = None
44         self.share_root_hash = None
45
46         # computed
47         self.block_size = None
48         self.share_size = None
49         self.num_segments = None
50         self.tail_data_size = None
51         self.tail_segment_size = None
52
53         # optional
54         self.crypttext_hash = None
55
56     def __str__(self):
57         return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
58
59     def _check_integrity(self, data):
60         h = uri_extension_hash(data)
61         if h != self._verifycap.uri_extension_hash:
62             msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
63                    (self._readbucketproxy,
64                     base32.b2a(self._verifycap.uri_extension_hash),
65                     base32.b2a(h)))
66             if self._fetch_failures is not None:
67                 self._fetch_failures["uri_extension"] += 1
68             raise BadURIExtensionHashValue(msg)
69         else:
70             return data
71
72     def _parse_and_validate(self, data):
73         self.share_size = mathutil.div_ceil(self._verifycap.size,
74                                             self._verifycap.needed_shares)
75
76         d = uri.unpack_extension(data)
77
78         # There are several kinds of things that can be found in a UEB.
79         # First, things that we really need to learn from the UEB in order to
80         # do this download. Next: things which are optional but not redundant
81         # -- if they are present in the UEB they will get used. Next, things
82         # that are optional and redundant. These things are required to be
83         # consistent: they don't have to be in the UEB, but if they are in
84         # the UEB then they will be checked for consistency with the
85         # already-known facts, and if they are inconsistent then an exception
86         # will be raised. These things aren't actually used -- they are just
87         # tested for consistency and ignored. Finally: things which are
88         # deprecated -- they ought not be in the UEB at all, and if they are
89         # present then a warning will be logged but they are otherwise
90         # ignored.
91
92         # First, things that we really need to learn from the UEB:
93         # segment_size, crypttext_root_hash, and share_root_hash.
94         self.segment_size = d['segment_size']
95
96         self.block_size = mathutil.div_ceil(self.segment_size,
97                                             self._verifycap.needed_shares)
98         self.num_segments = mathutil.div_ceil(self._verifycap.size,
99                                               self.segment_size)
100
101         self.tail_data_size = self._verifycap.size % self.segment_size
102         if not self.tail_data_size:
103             self.tail_data_size = self.segment_size
104         # padding for erasure code
105         self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
106                                                         self._verifycap.needed_shares)
107
108         # Ciphertext hash tree root is mandatory, so that there is at most
109         # one ciphertext that matches this read-cap or verify-cap. The
110         # integrity check on the shares is not sufficient to prevent the
111         # original encoder from creating some shares of file A and other
112         # shares of file B.
113         self.crypttext_root_hash = d['crypttext_root_hash']
114
115         self.share_root_hash = d['share_root_hash']
116
117
118         # Next: things that are optional and not redundant: crypttext_hash
119         if d.has_key('crypttext_hash'):
120             self.crypttext_hash = d['crypttext_hash']
121             if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
122                 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
123
124
125         # Next: things that are optional, redundant, and required to be
126         # consistent: codec_name, codec_params, tail_codec_params,
127         # num_segments, size, needed_shares, total_shares
128         if d.has_key('codec_name'):
129             if d['codec_name'] != "crs":
130                 raise UnsupportedErasureCodec(d['codec_name'])
131
132         if d.has_key('codec_params'):
133             ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
134             if ucpss != self.segment_size:
135                 raise BadURIExtension("inconsistent erasure code params: "
136                                       "ucpss: %s != self.segment_size: %s" %
137                                       (ucpss, self.segment_size))
138             if ucpns != self._verifycap.needed_shares:
139                 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
140                                       "self._verifycap.needed_shares: %s" %
141                                       (ucpns, self._verifycap.needed_shares))
142             if ucpts != self._verifycap.total_shares:
143                 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
144                                       "self._verifycap.total_shares: %s" %
145                                       (ucpts, self._verifycap.total_shares))
146
147         if d.has_key('tail_codec_params'):
148             utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
149             if utcpss != self.tail_segment_size:
150                 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
151                                       "self.tail_segment_size: %s, self._verifycap.size: %s, "
152                                       "self.segment_size: %s, self._verifycap.needed_shares: %s"
153                                       % (utcpss, self.tail_segment_size, self._verifycap.size,
154                                          self.segment_size, self._verifycap.needed_shares))
155             if utcpns != self._verifycap.needed_shares:
156                 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
157                                       "self._verifycap.needed_shares: %s" % (utcpns,
158                                                                              self._verifycap.needed_shares))
159             if utcpts != self._verifycap.total_shares:
160                 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
161                                       "self._verifycap.total_shares: %s" % (utcpts,
162                                                                             self._verifycap.total_shares))
163
164         if d.has_key('num_segments'):
165             if d['num_segments'] != self.num_segments:
166                 raise BadURIExtension("inconsistent num_segments: size: %s, "
167                                       "segment_size: %s, computed_num_segments: %s, "
168                                       "ueb_num_segments: %s" % (self._verifycap.size,
169                                                                 self.segment_size,
170                                                                 self.num_segments, d['num_segments']))
171
172         if d.has_key('size'):
173             if d['size'] != self._verifycap.size:
174                 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
175                                       (self._verifycap.size, d['size']))
176
177         if d.has_key('needed_shares'):
178             if d['needed_shares'] != self._verifycap.needed_shares:
179                 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
180                                       "needed shares: %s" % (self._verifycap.total_shares,
181                                                              d['needed_shares']))
182
183         if d.has_key('total_shares'):
184             if d['total_shares'] != self._verifycap.total_shares:
185                 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
186                                       "total shares: %s" % (self._verifycap.total_shares,
187                                                             d['total_shares']))
188
189         # Finally, things that are deprecated and ignored: plaintext_hash,
190         # plaintext_root_hash
191         if d.get('plaintext_hash'):
192             log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
193                     "and is no longer used.  Ignoring.  %s" % (self,))
194         if d.get('plaintext_root_hash'):
195             log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
196                     "reasons and is no longer used.  Ignoring.  %s" % (self,))
197
198         return self
199
200     def start(self):
201         """Fetch the UEB from bucket, compare its hash to the hash from
202         verifycap, then parse it. Returns a deferred which is called back
203         with self once the fetch is successful, or is erred back if it
204         fails."""
205         d = self._readbucketproxy.get_uri_extension()
206         d.addCallback(self._check_integrity)
207         d.addCallback(self._parse_and_validate)
208         return d
209
210 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
211     """I am a front-end for a remote storage bucket, responsible for
212     retrieving and validating data from that bucket.
213
214     My get_block() method is used by BlockDownloaders.
215     """
216
217     def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
218                  block_size, share_size):
219         """ share_hash_tree is required to have already been initialized with
220         the root hash (the number-0 hash), using the share_root_hash from the
221         UEB"""
222         precondition(share_hash_tree[0] is not None, share_hash_tree)
223         prefix = "%d-%s-%s" % (sharenum, bucket,
224                                base32.b2a_l(share_hash_tree[0][:8], 60))
225         log.PrefixingLogMixin.__init__(self,
226                                        facility="tahoe.immutable.download",
227                                        prefix=prefix)
228         self.sharenum = sharenum
229         self.bucket = bucket
230         self.share_hash_tree = share_hash_tree
231         self.num_blocks = num_blocks
232         self.block_size = block_size
233         self.share_size = share_size
234         self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
235
236     def get_all_sharehashes(self):
237         """Retrieve and validate all the share-hash-tree nodes that are
238         included in this share, regardless of whether we need them to
239         validate the share or not. Each share contains a minimal Merkle tree
240         chain, but there is lots of overlap, so usually we'll be using hashes
241         from other shares and not reading every single hash from this share.
242         The Verifier uses this function to read and validate every single
243         hash from this share.
244
245         Call this (and wait for the Deferred it returns to fire) before
246         calling get_block() for the first time: this lets us check that the
247         share share contains enough hashes to validate its own data, and
248         avoids downloading any share hash twice.
249
250         I return a Deferred which errbacks upon failure, probably with
251         BadOrMissingHash."""
252
253         d = self.bucket.get_share_hashes()
254         def _got_share_hashes(sh):
255             sharehashes = dict(sh)
256             try:
257                 self.share_hash_tree.set_hashes(sharehashes)
258             except IndexError, le:
259                 raise BadOrMissingHash(le)
260             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
261                 raise BadOrMissingHash(le)
262         d.addCallback(_got_share_hashes)
263         return d
264
265     def get_all_blockhashes(self):
266         """Retrieve and validate all the block-hash-tree nodes that are
267         included in this share. Each share contains a full Merkle tree, but
268         we usually only fetch the minimal subset necessary for any particular
269         block. This function fetches everything at once. The Verifier uses
270         this function to validate the block hash tree.
271
272         Call this (and wait for the Deferred it returns to fire) after
273         calling get_all_sharehashes() and before calling get_block() for the
274         first time: this lets us check that the share contains all block
275         hashes and avoids downloading them multiple times.
276
277         I return a Deferred which errbacks upon failure, probably with
278         BadOrMissingHash.
279         """
280
281         # get_block_hashes(anything) currently always returns everything
282         needed = list(range(len(self.block_hash_tree)))
283         d = self.bucket.get_block_hashes(needed)
284         def _got_block_hashes(blockhashes):
285             if len(blockhashes) < len(self.block_hash_tree):
286                 raise BadOrMissingHash()
287             bh = dict(enumerate(blockhashes))
288
289             try:
290                 self.block_hash_tree.set_hashes(bh)
291             except IndexError, le:
292                 raise BadOrMissingHash(le)
293             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
294                 raise BadOrMissingHash(le)
295         d.addCallback(_got_block_hashes)
296         return d
297
298     def get_all_crypttext_hashes(self, crypttext_hash_tree):
299         """Retrieve and validate all the crypttext-hash-tree nodes that are
300         in this share. Normally we don't look at these at all: the download
301         process fetches them incrementally as needed to validate each segment
302         of ciphertext. But this is a convenient place to give the Verifier a
303         function to validate all of these at once.
304
305         Call this with a new hashtree object for each share, initialized with
306         the crypttext hash tree root. I return a Deferred which errbacks upon
307         failure, probably with BadOrMissingHash.
308         """
309
310         # get_crypttext_hashes() always returns everything
311         d = self.bucket.get_crypttext_hashes()
312         def _got_crypttext_hashes(hashes):
313             if len(hashes) < len(crypttext_hash_tree):
314                 raise BadOrMissingHash()
315             ct_hashes = dict(enumerate(hashes))
316             try:
317                 crypttext_hash_tree.set_hashes(ct_hashes)
318             except IndexError, le:
319                 raise BadOrMissingHash(le)
320             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
321                 raise BadOrMissingHash(le)
322         d.addCallback(_got_crypttext_hashes)
323         return d
324
325     def get_block(self, blocknum):
326         # the first time we use this bucket, we need to fetch enough elements
327         # of the share hash tree to validate it from our share hash up to the
328         # hashroot.
329         if self.share_hash_tree.needed_hashes(self.sharenum):
330             d1 = self.bucket.get_share_hashes()
331         else:
332             d1 = defer.succeed([])
333
334         # We might need to grab some elements of our block hash tree, to
335         # validate the requested block up to the share hash.
336         blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
337         # We don't need the root of the block hash tree, as that comes in the
338         # share tree.
339         blockhashesneeded.discard(0)
340         d2 = self.bucket.get_block_hashes(blockhashesneeded)
341
342         if blocknum < self.num_blocks-1:
343             thisblocksize = self.block_size
344         else:
345             thisblocksize = self.share_size % self.block_size
346             if thisblocksize == 0:
347                 thisblocksize = self.block_size
348         d3 = self.bucket.get_block_data(blocknum,
349                                         self.block_size, thisblocksize)
350
351         dl = deferredutil.gatherResults([d1, d2, d3])
352         dl.addCallback(self._got_data, blocknum)
353         return dl
354
355     def _got_data(self, results, blocknum):
356         precondition(blocknum < self.num_blocks,
357                      self, blocknum, self.num_blocks)
358         sharehashes, blockhashes, blockdata = results
359         try:
360             sharehashes = dict(sharehashes)
361         except ValueError, le:
362             le.args = tuple(le.args + (sharehashes,))
363             raise
364         blockhashes = dict(enumerate(blockhashes))
365
366         candidate_share_hash = None # in case we log it in the except block below
367         blockhash = None # in case we log it in the except block below
368
369         try:
370             if self.share_hash_tree.needed_hashes(self.sharenum):
371                 # This will raise exception if the values being passed do not
372                 # match the root node of self.share_hash_tree.
373                 try:
374                     self.share_hash_tree.set_hashes(sharehashes)
375                 except IndexError, le:
376                     # Weird -- sharehashes contained index numbers outside of
377                     # the range that fit into this hash tree.
378                     raise BadOrMissingHash(le)
379
380             # To validate a block we need the root of the block hash tree,
381             # which is also one of the leafs of the share hash tree, and is
382             # called "the share hash".
383             if not self.block_hash_tree[0]: # empty -- no root node yet
384                 # Get the share hash from the share hash tree.
385                 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
386                 if not share_hash:
387                     # No root node in block_hash_tree and also the share hash
388                     # wasn't sent by the server.
389                     raise hashtree.NotEnoughHashesError
390                 self.block_hash_tree.set_hashes({0: share_hash})
391
392             if self.block_hash_tree.needed_hashes(blocknum):
393                 self.block_hash_tree.set_hashes(blockhashes)
394
395             blockhash = block_hash(blockdata)
396             self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
397             #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
398             #        "%r .. %r: %s" %
399             #        (self.sharenum, blocknum, len(blockdata),
400             #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
401
402         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
403             # log.WEIRD: indicates undetected disk/network error, or more
404             # likely a programming error
405             self.log("hash failure in block=%d, shnum=%d on %s" %
406                     (blocknum, self.sharenum, self.bucket))
407             if self.block_hash_tree.needed_hashes(blocknum):
408                 self.log(""" failure occurred when checking the block_hash_tree.
409                 This suggests that either the block data was bad, or that the
410                 block hashes we received along with it were bad.""")
411             else:
412                 self.log(""" the failure probably occurred when checking the
413                 share_hash_tree, which suggests that the share hashes we
414                 received from the remote peer were bad.""")
415             self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
416             self.log(" block length: %d" % len(blockdata))
417             self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
418             if len(blockdata) < 100:
419                 self.log(" block data: %r" % (blockdata,))
420             else:
421                 self.log(" block data start/end: %r .. %r" %
422                         (blockdata[:50], blockdata[-50:]))
423             self.log(" share hash tree:\n" + self.share_hash_tree.dump())
424             self.log(" block hash tree:\n" + self.block_hash_tree.dump())
425             lines = []
426             for i,h in sorted(sharehashes.items()):
427                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
428             self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
429             lines = []
430             for i,h in blockhashes.items():
431                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
432             log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
433             raise BadOrMissingHash(le)
434
435         # If we made it here, the block is good. If the hash trees didn't
436         # like what they saw, they would have raised a BadHashError, causing
437         # our caller to see a Failure and thus ignore this block (as well as
438         # dropping this bucket).
439         return blockdata
440
441
442 class Checker(log.PrefixingLogMixin):
443     """I query all servers to see if M uniquely-numbered shares are
444     available.
445
446     If the verify flag was passed to my constructor, then for each share I
447     download every data block and all metadata from each server and perform a
448     cryptographic integrity check on all of it. If not, I just ask each
449     server 'Which shares do you have?' and believe its answer.
450
451     In either case, I wait until I have gotten responses from all servers.
452     This fact -- that I wait -- means that an ill-behaved server which fails
453     to answer my questions will make me wait indefinitely. If it is
454     ill-behaved in a way that triggers the underlying foolscap timeouts, then
455     I will wait only as long as those foolscap timeouts, but if it is
456     ill-behaved in a way which placates the foolscap timeouts but still
457     doesn't answer my question then I will wait indefinitely.
458
459     Before I send any new request to a server, I always ask the 'monitor'
460     object that was passed into my constructor whether this task has been
461     cancelled (by invoking its raise_if_cancelled() method).
462     """
463     def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
464                  monitor):
465         assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
466
467         prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
468         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
469
470         self._verifycap = verifycap
471
472         self._monitor = monitor
473         self._servers = servers
474         self._verify = verify # bool: verify what the servers claim, or not?
475         self._add_lease = add_lease
476
477         frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
478                                        self._verifycap.get_storage_index())
479         self.file_renewal_secret = frs
480         fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
481                                       self._verifycap.get_storage_index())
482         self.file_cancel_secret = fcs
483
484     def _get_renewal_secret(self, seed):
485         return bucket_renewal_secret_hash(self.file_renewal_secret, seed)
486     def _get_cancel_secret(self, seed):
487         return bucket_cancel_secret_hash(self.file_cancel_secret, seed)
488
489     def _get_buckets(self, s, storageindex):
490         """Return a deferred that eventually fires with ({sharenum: bucket},
491         serverid, success). In case the server is disconnected or returns a
492         Failure then it fires with ({}, serverid, False) (A server
493         disconnecting or returning a Failure when we ask it for buckets is
494         the same, for our purposes, as a server that says it has none, except
495         that we want to track and report whether or not each server
496         responded.)"""
497
498         rref = s.get_rref()
499         lease_seed = s.get_lease_seed()
500         if self._add_lease:
501             renew_secret = self._get_renewal_secret(lease_seed)
502             cancel_secret = self._get_cancel_secret(lease_seed)
503             d2 = rref.callRemote("add_lease", storageindex,
504                                  renew_secret, cancel_secret)
505             d2.addErrback(self._add_lease_failed, s.get_name(), storageindex)
506
507         d = rref.callRemote("get_buckets", storageindex)
508         def _wrap_results(res):
509             return (res, True)
510
511         def _trap_errs(f):
512             level = log.WEIRD
513             if f.check(DeadReferenceError):
514                 level = log.UNUSUAL
515             self.log("failure from server on 'get_buckets' the REMOTE failure was:",
516                      facility="tahoe.immutable.checker",
517                      failure=f, level=level, umid="AX7wZQ")
518             return ({}, False)
519
520         d.addCallbacks(_wrap_results, _trap_errs)
521         return d
522
523     def _add_lease_failed(self, f, server_name, storage_index):
524         # Older versions of Tahoe didn't handle the add-lease message very
525         # well: <=1.1.0 throws a NameError because it doesn't implement
526         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
527         # (which is most of them, since we send add-lease to everybody,
528         # before we know whether or not they have any shares for us), and
529         # 1.2.0 throws KeyError even on known buckets due to an internal bug
530         # in the latency-measuring code.
531
532         # we want to ignore the known-harmless errors and log the others. In
533         # particular we want to log any local errors caused by coding
534         # problems.
535
536         if f.check(DeadReferenceError):
537             return
538         if f.check(RemoteException):
539             if f.value.failure.check(KeyError, IndexError, NameError):
540                 # this may ignore a bit too much, but that only hurts us
541                 # during debugging
542                 return
543             self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
544                      name=server_name,
545                      f_value=str(f.value),
546                      failure=f,
547                      level=log.WEIRD, umid="atbAxw")
548             return
549         # local errors are cause for alarm
550         log.err(f,
551                 format="local error in add_lease to [%(name)s]: %(f_value)s",
552                 name=server_name,
553                 f_value=str(f.value),
554                 level=log.WEIRD, umid="hEGuQg")
555
556
557     def _download_and_verify(self, server, sharenum, bucket):
558         """Start an attempt to download and verify every block in this bucket
559         and return a deferred that will eventually fire once the attempt
560         completes.
561
562         If you download and verify every block then fire with (True,
563         sharenum, None), else if the share data couldn't be parsed because it
564         was of an unknown version number fire with (False, sharenum,
565         'incompatible'), else if any of the blocks were invalid, fire with
566         (False, sharenum, 'corrupt'), else if the server disconnected (False,
567         sharenum, 'disconnect'), else if the server returned a Failure during
568         the process fire with (False, sharenum, 'failure').
569
570         If there is an internal error such as an uncaught exception in this
571         code, then the deferred will errback, but if there is a remote error
572         such as the server failing or the returned data being incorrect then
573         it will not errback -- it will fire normally with the indicated
574         results."""
575
576         vcap = self._verifycap
577         b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index())
578         veup = ValidatedExtendedURIProxy(b, vcap)
579         d = veup.start()
580
581         def _got_ueb(vup):
582             share_hash_tree = IncompleteHashTree(vcap.total_shares)
583             share_hash_tree.set_hashes({0: vup.share_root_hash})
584
585             vrbp = ValidatedReadBucketProxy(sharenum, b,
586                                             share_hash_tree,
587                                             vup.num_segments,
588                                             vup.block_size,
589                                             vup.share_size)
590
591             # note: normal download doesn't use get_all_sharehashes(),
592             # because it gets more data than necessary. We've discussed the
593             # security properties of having verification and download look
594             # identical (so the server couldn't, say, provide good responses
595             # for one and not the other), but I think that full verification
596             # is more important than defending against inconsistent server
597             # behavior. Besides, they can't pass the verifier without storing
598             # all the data, so there's not so much to be gained by behaving
599             # inconsistently.
600             d = vrbp.get_all_sharehashes()
601             # we fill share_hash_tree before fetching any blocks, so the
602             # block fetches won't send redundant share-hash-tree requests, to
603             # speed things up. Then we fetch+validate all the blockhashes.
604             d.addCallback(lambda ign: vrbp.get_all_blockhashes())
605
606             cht = IncompleteHashTree(vup.num_segments)
607             cht.set_hashes({0: vup.crypttext_root_hash})
608             d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
609
610             d.addCallback(lambda ign: vrbp)
611             return d
612         d.addCallback(_got_ueb)
613
614         def _discard_result(r):
615             assert isinstance(r, str), r
616             # to free up the RAM
617             return None
618
619         def _get_blocks(vrbp):
620             def _get_block(ign, blocknum):
621                 db = vrbp.get_block(blocknum)
622                 db.addCallback(_discard_result)
623                 return db
624
625             dbs = defer.succeed(None)
626             for blocknum in range(veup.num_segments):
627                 dbs.addCallback(_get_block, blocknum)
628
629             # The Deferred we return will fire after every block of this
630             # share has been downloaded and verified successfully, or else it
631             # will errback as soon as the first error is observed.
632             return dbs
633
634         d.addCallback(_get_blocks)
635
636         # if none of those errbacked, the blocks (and the hashes above them)
637         # are good
638         def _all_good(ign):
639             return (True, sharenum, None)
640         d.addCallback(_all_good)
641
642         # but if anything fails, we'll land here
643         def _errb(f):
644             # We didn't succeed at fetching and verifying all the blocks of
645             # this share. Handle each reason for failure differently.
646
647             if f.check(DeadReferenceError):
648                 return (False, sharenum, 'disconnect')
649             elif f.check(RemoteException):
650                 return (False, sharenum, 'failure')
651             elif f.check(layout.ShareVersionIncompatible):
652                 return (False, sharenum, 'incompatible')
653             elif f.check(layout.LayoutInvalid,
654                          layout.RidiculouslyLargeURIExtensionBlock,
655                          BadOrMissingHash,
656                          BadURIExtensionHashValue):
657                 return (False, sharenum, 'corrupt')
658
659             # if it wasn't one of those reasons, re-raise the error
660             return f
661         d.addErrback(_errb)
662
663         return d
664
665     def _verify_server_shares(self, s):
666         """ Return a deferred which eventually fires with a tuple of
667         (set(sharenum), server, set(corruptsharenum),
668         set(incompatiblesharenum), success) showing all the shares verified
669         to be served by this server, and all the corrupt shares served by the
670         server, and all the incompatible shares served by the server. In case
671         the server is disconnected or returns a Failure then it fires with
672         the last element False.
673
674         A server disconnecting or returning a failure when we ask it for
675         shares is the same, for our purposes, as a server that says it has
676         none or offers invalid ones, except that we want to track and report
677         the server's behavior. Similarly, the presence of corrupt shares is
678         mainly of use for diagnostics -- you can typically treat it as just
679         like being no share at all by just observing its absence from the
680         verified shares dict and ignoring its presence in the corrupt shares
681         dict.
682
683         The 'success' argument means whether the server responded to *any*
684         queries during this process, so if it responded to some queries and
685         then disconnected and ceased responding, or returned a failure, it is
686         still marked with the True flag for 'success'.
687         """
688         d = self._get_buckets(s, self._verifycap.get_storage_index())
689
690         def _got_buckets(result):
691             bucketdict, success = result
692
693             shareverds = []
694             for (sharenum, bucket) in bucketdict.items():
695                 d = self._download_and_verify(s, sharenum, bucket)
696                 shareverds.append(d)
697
698             dl = deferredutil.gatherResults(shareverds)
699
700             def collect(results):
701                 verified = set()
702                 corrupt = set()
703                 incompatible = set()
704                 for succ, sharenum, whynot in results:
705                     if succ:
706                         verified.add(sharenum)
707                     else:
708                         if whynot == 'corrupt':
709                             corrupt.add(sharenum)
710                         elif whynot == 'incompatible':
711                             incompatible.add(sharenum)
712                 return (verified, s, corrupt, incompatible, success)
713
714             dl.addCallback(collect)
715             return dl
716
717         def _err(f):
718             f.trap(RemoteException, DeadReferenceError)
719             return (set(), s, set(), set(), False)
720
721         d.addCallbacks(_got_buckets, _err)
722         return d
723
724     def _check_server_shares(self, s):
725         """Return a deferred which eventually fires with a tuple of
726         (set(sharenum), server, set(), set(), responded) showing all the
727         shares claimed to be served by this server. In case the server is
728         disconnected then it fires with (set(), server, set(), set(), False)
729         (a server disconnecting when we ask it for buckets is the same, for
730         our purposes, as a server that says it has none, except that we want
731         to track and report whether or not each server responded.)"""
732         def _curry_empty_corrupted(res):
733             buckets, responded = res
734             return (set(buckets), s, set(), set(), responded)
735         d = self._get_buckets(s, self._verifycap.get_storage_index())
736         d.addCallback(_curry_empty_corrupted)
737         return d
738
739     def _format_results(self, results):
740         SI = self._verifycap.get_storage_index()
741
742         verifiedshares = dictutil.DictOfSets() # {sharenum: set(server)}
743         servers = {} # {server: set(sharenums)}
744         corruptshare_locators = [] # (server, storageindex, sharenum)
745         incompatibleshare_locators = [] # (server, storageindex, sharenum)
746         servers_responding = set() # server
747
748         for verified, server, corrupt, incompatible, responded in results:
749             servers.setdefault(server, set()).update(verified)
750             for sharenum in verified:
751                 verifiedshares.setdefault(sharenum, set()).add(server)
752             for sharenum in corrupt:
753                 corruptshare_locators.append((server, SI, sharenum))
754             for sharenum in incompatible:
755                 incompatibleshare_locators.append((server, SI, sharenum))
756             if responded:
757                 servers_responding.add(server)
758
759         good_share_hosts = len([s for s in servers.keys() if servers[s]])
760
761         assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
762         if len(verifiedshares) == self._verifycap.total_shares:
763             healthy = True
764             summary = "Healthy"
765         else:
766             healthy = False
767             summary = ("Not Healthy: %d shares (enc %d-of-%d)" %
768                        (len(verifiedshares),
769                         self._verifycap.needed_shares,
770                         self._verifycap.total_shares))
771         if len(verifiedshares) >= self._verifycap.needed_shares:
772             recoverable = 1
773             unrecoverable = 0
774         else:
775             recoverable = 0
776             unrecoverable = 1
777
778         # The file needs rebalancing if the set of servers that have at least
779         # one share is less than the number of uniquely-numbered shares
780         # available.
781         needs_rebalancing = bool(good_share_hosts < len(verifiedshares))
782
783         cr = CheckResults(self._verifycap, SI,
784                           healthy=healthy, recoverable=bool(recoverable),
785                           needs_rebalancing=needs_rebalancing,
786                           count_shares_needed=self._verifycap.needed_shares,
787                           count_shares_expected=self._verifycap.total_shares,
788                           count_shares_good=len(verifiedshares),
789                           count_good_share_hosts=good_share_hosts,
790                           count_recoverable_versions=recoverable,
791                           count_unrecoverable_versions=unrecoverable,
792                           servers_responding=list(servers_responding),
793                           sharemap=verifiedshares,
794                           count_wrong_shares=0, # no such thing, for immutable
795                           list_corrupt_shares=corruptshare_locators,
796                           count_corrupt_shares=len(corruptshare_locators),
797                           list_incompatible_shares=incompatibleshare_locators,
798                           count_incompatible_shares=len(incompatibleshare_locators),
799                           summary=summary,
800                           report=[],
801                           share_problems=[],
802                           servermap=None)
803
804         return cr
805
806     def start(self):
807         ds = []
808         if self._verify:
809             for s in self._servers:
810                 ds.append(self._verify_server_shares(s))
811         else:
812             for s in self._servers:
813                 ds.append(self._check_server_shares(s))
814
815         return deferredutil.gatherResults(ds).addCallback(self._format_results)