]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/checker.py
immutable/checker.py: remove some uses of s.get_serverid(), not all
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / checker.py
1 from zope.interface import implements
2 from twisted.internet import defer
3 from foolscap.api import DeadReferenceError, RemoteException
4 from allmydata import hashtree, codec, uri
5 from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
6 from allmydata.hashtree import IncompleteHashTree
7 from allmydata.check_results import CheckResults
8 from allmydata.uri import CHKFileVerifierURI
9 from allmydata.util.assertutil import precondition
10 from allmydata.util import base32, deferredutil, dictutil, log, mathutil
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12      file_cancel_secret_hash, bucket_renewal_secret_hash, \
13      bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
14      block_hash
15
16 from allmydata.immutable import layout
17
18 class IntegrityCheckReject(Exception):
19     pass
20 class BadURIExtension(IntegrityCheckReject):
21     pass
22 class BadURIExtensionHashValue(IntegrityCheckReject):
23     pass
24 class BadOrMissingHash(IntegrityCheckReject):
25     pass
26 class UnsupportedErasureCodec(BadURIExtension):
27     pass
28
29 class ValidatedExtendedURIProxy:
30     implements(IValidatedThingProxy)
31     """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
32     responsible for retrieving and validating the elements from the UEB."""
33
34     def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
35         # fetch_failures is for debugging -- see test_encode.py
36         self._fetch_failures = fetch_failures
37         self._readbucketproxy = readbucketproxy
38         precondition(IVerifierURI.providedBy(verifycap), verifycap)
39         self._verifycap = verifycap
40
41         # required
42         self.segment_size = None
43         self.crypttext_root_hash = None
44         self.share_root_hash = None
45
46         # computed
47         self.block_size = None
48         self.share_size = None
49         self.num_segments = None
50         self.tail_data_size = None
51         self.tail_segment_size = None
52
53         # optional
54         self.crypttext_hash = None
55
56     def __str__(self):
57         return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
58
59     def _check_integrity(self, data):
60         h = uri_extension_hash(data)
61         if h != self._verifycap.uri_extension_hash:
62             msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
63                    (self._readbucketproxy,
64                     base32.b2a(self._verifycap.uri_extension_hash),
65                     base32.b2a(h)))
66             if self._fetch_failures is not None:
67                 self._fetch_failures["uri_extension"] += 1
68             raise BadURIExtensionHashValue(msg)
69         else:
70             return data
71
72     def _parse_and_validate(self, data):
73         self.share_size = mathutil.div_ceil(self._verifycap.size,
74                                             self._verifycap.needed_shares)
75
76         d = uri.unpack_extension(data)
77
78         # There are several kinds of things that can be found in a UEB.
79         # First, things that we really need to learn from the UEB in order to
80         # do this download. Next: things which are optional but not redundant
81         # -- if they are present in the UEB they will get used. Next, things
82         # that are optional and redundant. These things are required to be
83         # consistent: they don't have to be in the UEB, but if they are in
84         # the UEB then they will be checked for consistency with the
85         # already-known facts, and if they are inconsistent then an exception
86         # will be raised. These things aren't actually used -- they are just
87         # tested for consistency and ignored. Finally: things which are
88         # deprecated -- they ought not be in the UEB at all, and if they are
89         # present then a warning will be logged but they are otherwise
90         # ignored.
91
92         # First, things that we really need to learn from the UEB:
93         # segment_size, crypttext_root_hash, and share_root_hash.
94         self.segment_size = d['segment_size']
95
96         self.block_size = mathutil.div_ceil(self.segment_size,
97                                             self._verifycap.needed_shares)
98         self.num_segments = mathutil.div_ceil(self._verifycap.size,
99                                               self.segment_size)
100
101         self.tail_data_size = self._verifycap.size % self.segment_size
102         if not self.tail_data_size:
103             self.tail_data_size = self.segment_size
104         # padding for erasure code
105         self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
106                                                         self._verifycap.needed_shares)
107
108         # Ciphertext hash tree root is mandatory, so that there is at most
109         # one ciphertext that matches this read-cap or verify-cap. The
110         # integrity check on the shares is not sufficient to prevent the
111         # original encoder from creating some shares of file A and other
112         # shares of file B.
113         self.crypttext_root_hash = d['crypttext_root_hash']
114
115         self.share_root_hash = d['share_root_hash']
116
117
118         # Next: things that are optional and not redundant: crypttext_hash
119         if d.has_key('crypttext_hash'):
120             self.crypttext_hash = d['crypttext_hash']
121             if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
122                 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
123
124
125         # Next: things that are optional, redundant, and required to be
126         # consistent: codec_name, codec_params, tail_codec_params,
127         # num_segments, size, needed_shares, total_shares
128         if d.has_key('codec_name'):
129             if d['codec_name'] != "crs":
130                 raise UnsupportedErasureCodec(d['codec_name'])
131
132         if d.has_key('codec_params'):
133             ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
134             if ucpss != self.segment_size:
135                 raise BadURIExtension("inconsistent erasure code params: "
136                                       "ucpss: %s != self.segment_size: %s" %
137                                       (ucpss, self.segment_size))
138             if ucpns != self._verifycap.needed_shares:
139                 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
140                                       "self._verifycap.needed_shares: %s" %
141                                       (ucpns, self._verifycap.needed_shares))
142             if ucpts != self._verifycap.total_shares:
143                 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
144                                       "self._verifycap.total_shares: %s" %
145                                       (ucpts, self._verifycap.total_shares))
146
147         if d.has_key('tail_codec_params'):
148             utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
149             if utcpss != self.tail_segment_size:
150                 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
151                                       "self.tail_segment_size: %s, self._verifycap.size: %s, "
152                                       "self.segment_size: %s, self._verifycap.needed_shares: %s"
153                                       % (utcpss, self.tail_segment_size, self._verifycap.size,
154                                          self.segment_size, self._verifycap.needed_shares))
155             if utcpns != self._verifycap.needed_shares:
156                 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
157                                       "self._verifycap.needed_shares: %s" % (utcpns,
158                                                                              self._verifycap.needed_shares))
159             if utcpts != self._verifycap.total_shares:
160                 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
161                                       "self._verifycap.total_shares: %s" % (utcpts,
162                                                                             self._verifycap.total_shares))
163
164         if d.has_key('num_segments'):
165             if d['num_segments'] != self.num_segments:
166                 raise BadURIExtension("inconsistent num_segments: size: %s, "
167                                       "segment_size: %s, computed_num_segments: %s, "
168                                       "ueb_num_segments: %s" % (self._verifycap.size,
169                                                                 self.segment_size,
170                                                                 self.num_segments, d['num_segments']))
171
172         if d.has_key('size'):
173             if d['size'] != self._verifycap.size:
174                 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
175                                       (self._verifycap.size, d['size']))
176
177         if d.has_key('needed_shares'):
178             if d['needed_shares'] != self._verifycap.needed_shares:
179                 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
180                                       "needed shares: %s" % (self._verifycap.total_shares,
181                                                              d['needed_shares']))
182
183         if d.has_key('total_shares'):
184             if d['total_shares'] != self._verifycap.total_shares:
185                 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
186                                       "total shares: %s" % (self._verifycap.total_shares,
187                                                             d['total_shares']))
188
189         # Finally, things that are deprecated and ignored: plaintext_hash,
190         # plaintext_root_hash
191         if d.get('plaintext_hash'):
192             log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
193                     "and is no longer used.  Ignoring.  %s" % (self,))
194         if d.get('plaintext_root_hash'):
195             log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
196                     "reasons and is no longer used.  Ignoring.  %s" % (self,))
197
198         return self
199
200     def start(self):
201         """Fetch the UEB from bucket, compare its hash to the hash from
202         verifycap, then parse it. Returns a deferred which is called back
203         with self once the fetch is successful, or is erred back if it
204         fails."""
205         d = self._readbucketproxy.get_uri_extension()
206         d.addCallback(self._check_integrity)
207         d.addCallback(self._parse_and_validate)
208         return d
209
210 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
211     """I am a front-end for a remote storage bucket, responsible for
212     retrieving and validating data from that bucket.
213
214     My get_block() method is used by BlockDownloaders.
215     """
216
217     def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
218                  block_size, share_size):
219         """ share_hash_tree is required to have already been initialized with
220         the root hash (the number-0 hash), using the share_root_hash from the
221         UEB"""
222         precondition(share_hash_tree[0] is not None, share_hash_tree)
223         prefix = "%d-%s-%s" % (sharenum, bucket,
224                                base32.b2a_l(share_hash_tree[0][:8], 60))
225         log.PrefixingLogMixin.__init__(self,
226                                        facility="tahoe.immutable.download",
227                                        prefix=prefix)
228         self.sharenum = sharenum
229         self.bucket = bucket
230         self.share_hash_tree = share_hash_tree
231         self.num_blocks = num_blocks
232         self.block_size = block_size
233         self.share_size = share_size
234         self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
235
236     def get_all_sharehashes(self):
237         """Retrieve and validate all the share-hash-tree nodes that are
238         included in this share, regardless of whether we need them to
239         validate the share or not. Each share contains a minimal Merkle tree
240         chain, but there is lots of overlap, so usually we'll be using hashes
241         from other shares and not reading every single hash from this share.
242         The Verifier uses this function to read and validate every single
243         hash from this share.
244
245         Call this (and wait for the Deferred it returns to fire) before
246         calling get_block() for the first time: this lets us check that the
247         share share contains enough hashes to validate its own data, and
248         avoids downloading any share hash twice.
249
250         I return a Deferred which errbacks upon failure, probably with
251         BadOrMissingHash."""
252
253         d = self.bucket.get_share_hashes()
254         def _got_share_hashes(sh):
255             sharehashes = dict(sh)
256             try:
257                 self.share_hash_tree.set_hashes(sharehashes)
258             except IndexError, le:
259                 raise BadOrMissingHash(le)
260             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
261                 raise BadOrMissingHash(le)
262         d.addCallback(_got_share_hashes)
263         return d
264
265     def get_all_blockhashes(self):
266         """Retrieve and validate all the block-hash-tree nodes that are
267         included in this share. Each share contains a full Merkle tree, but
268         we usually only fetch the minimal subset necessary for any particular
269         block. This function fetches everything at once. The Verifier uses
270         this function to validate the block hash tree.
271
272         Call this (and wait for the Deferred it returns to fire) after
273         calling get_all_sharehashes() and before calling get_block() for the
274         first time: this lets us check that the share contains all block
275         hashes and avoids downloading them multiple times.
276
277         I return a Deferred which errbacks upon failure, probably with
278         BadOrMissingHash.
279         """
280
281         # get_block_hashes(anything) currently always returns everything
282         needed = list(range(len(self.block_hash_tree)))
283         d = self.bucket.get_block_hashes(needed)
284         def _got_block_hashes(blockhashes):
285             if len(blockhashes) < len(self.block_hash_tree):
286                 raise BadOrMissingHash()
287             bh = dict(enumerate(blockhashes))
288
289             try:
290                 self.block_hash_tree.set_hashes(bh)
291             except IndexError, le:
292                 raise BadOrMissingHash(le)
293             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
294                 raise BadOrMissingHash(le)
295         d.addCallback(_got_block_hashes)
296         return d
297
298     def get_all_crypttext_hashes(self, crypttext_hash_tree):
299         """Retrieve and validate all the crypttext-hash-tree nodes that are
300         in this share. Normally we don't look at these at all: the download
301         process fetches them incrementally as needed to validate each segment
302         of ciphertext. But this is a convenient place to give the Verifier a
303         function to validate all of these at once.
304
305         Call this with a new hashtree object for each share, initialized with
306         the crypttext hash tree root. I return a Deferred which errbacks upon
307         failure, probably with BadOrMissingHash.
308         """
309
310         # get_crypttext_hashes() always returns everything
311         d = self.bucket.get_crypttext_hashes()
312         def _got_crypttext_hashes(hashes):
313             if len(hashes) < len(crypttext_hash_tree):
314                 raise BadOrMissingHash()
315             ct_hashes = dict(enumerate(hashes))
316             try:
317                 crypttext_hash_tree.set_hashes(ct_hashes)
318             except IndexError, le:
319                 raise BadOrMissingHash(le)
320             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
321                 raise BadOrMissingHash(le)
322         d.addCallback(_got_crypttext_hashes)
323         return d
324
325     def get_block(self, blocknum):
326         # the first time we use this bucket, we need to fetch enough elements
327         # of the share hash tree to validate it from our share hash up to the
328         # hashroot.
329         if self.share_hash_tree.needed_hashes(self.sharenum):
330             d1 = self.bucket.get_share_hashes()
331         else:
332             d1 = defer.succeed([])
333
334         # We might need to grab some elements of our block hash tree, to
335         # validate the requested block up to the share hash.
336         blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
337         # We don't need the root of the block hash tree, as that comes in the
338         # share tree.
339         blockhashesneeded.discard(0)
340         d2 = self.bucket.get_block_hashes(blockhashesneeded)
341
342         if blocknum < self.num_blocks-1:
343             thisblocksize = self.block_size
344         else:
345             thisblocksize = self.share_size % self.block_size
346             if thisblocksize == 0:
347                 thisblocksize = self.block_size
348         d3 = self.bucket.get_block_data(blocknum,
349                                         self.block_size, thisblocksize)
350
351         dl = deferredutil.gatherResults([d1, d2, d3])
352         dl.addCallback(self._got_data, blocknum)
353         return dl
354
355     def _got_data(self, results, blocknum):
356         precondition(blocknum < self.num_blocks,
357                      self, blocknum, self.num_blocks)
358         sharehashes, blockhashes, blockdata = results
359         try:
360             sharehashes = dict(sharehashes)
361         except ValueError, le:
362             le.args = tuple(le.args + (sharehashes,))
363             raise
364         blockhashes = dict(enumerate(blockhashes))
365
366         candidate_share_hash = None # in case we log it in the except block below
367         blockhash = None # in case we log it in the except block below
368
369         try:
370             if self.share_hash_tree.needed_hashes(self.sharenum):
371                 # This will raise exception if the values being passed do not
372                 # match the root node of self.share_hash_tree.
373                 try:
374                     self.share_hash_tree.set_hashes(sharehashes)
375                 except IndexError, le:
376                     # Weird -- sharehashes contained index numbers outside of
377                     # the range that fit into this hash tree.
378                     raise BadOrMissingHash(le)
379
380             # To validate a block we need the root of the block hash tree,
381             # which is also one of the leafs of the share hash tree, and is
382             # called "the share hash".
383             if not self.block_hash_tree[0]: # empty -- no root node yet
384                 # Get the share hash from the share hash tree.
385                 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
386                 if not share_hash:
387                     # No root node in block_hash_tree and also the share hash
388                     # wasn't sent by the server.
389                     raise hashtree.NotEnoughHashesError
390                 self.block_hash_tree.set_hashes({0: share_hash})
391
392             if self.block_hash_tree.needed_hashes(blocknum):
393                 self.block_hash_tree.set_hashes(blockhashes)
394
395             blockhash = block_hash(blockdata)
396             self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
397             #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
398             #        "%r .. %r: %s" %
399             #        (self.sharenum, blocknum, len(blockdata),
400             #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
401
402         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
403             # log.WEIRD: indicates undetected disk/network error, or more
404             # likely a programming error
405             self.log("hash failure in block=%d, shnum=%d on %s" %
406                     (blocknum, self.sharenum, self.bucket))
407             if self.block_hash_tree.needed_hashes(blocknum):
408                 self.log(""" failure occurred when checking the block_hash_tree.
409                 This suggests that either the block data was bad, or that the
410                 block hashes we received along with it were bad.""")
411             else:
412                 self.log(""" the failure probably occurred when checking the
413                 share_hash_tree, which suggests that the share hashes we
414                 received from the remote peer were bad.""")
415             self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
416             self.log(" block length: %d" % len(blockdata))
417             self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
418             if len(blockdata) < 100:
419                 self.log(" block data: %r" % (blockdata,))
420             else:
421                 self.log(" block data start/end: %r .. %r" %
422                         (blockdata[:50], blockdata[-50:]))
423             self.log(" share hash tree:\n" + self.share_hash_tree.dump())
424             self.log(" block hash tree:\n" + self.block_hash_tree.dump())
425             lines = []
426             for i,h in sorted(sharehashes.items()):
427                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
428             self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
429             lines = []
430             for i,h in blockhashes.items():
431                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
432             log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
433             raise BadOrMissingHash(le)
434
435         # If we made it here, the block is good. If the hash trees didn't
436         # like what they saw, they would have raised a BadHashError, causing
437         # our caller to see a Failure and thus ignore this block (as well as
438         # dropping this bucket).
439         return blockdata
440
441
442 class Checker(log.PrefixingLogMixin):
443     """I query all servers to see if M uniquely-numbered shares are
444     available.
445
446     If the verify flag was passed to my constructor, then for each share I
447     download every data block and all metadata from each server and perform a
448     cryptographic integrity check on all of it. If not, I just ask each
449     server 'Which shares do you have?' and believe its answer.
450
451     In either case, I wait until I have gotten responses from all servers.
452     This fact -- that I wait -- means that an ill-behaved server which fails
453     to answer my questions will make me wait indefinitely. If it is
454     ill-behaved in a way that triggers the underlying foolscap timeouts, then
455     I will wait only as long as those foolscap timeouts, but if it is
456     ill-behaved in a way which placates the foolscap timeouts but still
457     doesn't answer my question then I will wait indefinitely.
458
459     Before I send any new request to a server, I always ask the 'monitor'
460     object that was passed into my constructor whether this task has been
461     cancelled (by invoking its raise_if_cancelled() method).
462     """
463     def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
464                  monitor):
465         assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
466
467         prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
468         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
469
470         self._verifycap = verifycap
471
472         self._monitor = monitor
473         self._servers = servers
474         self._verify = verify # bool: verify what the servers claim, or not?
475         self._add_lease = add_lease
476
477         frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
478                                        self._verifycap.get_storage_index())
479         self.file_renewal_secret = frs
480         fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
481                                       self._verifycap.get_storage_index())
482         self.file_cancel_secret = fcs
483
484     def _get_renewal_secret(self, seed):
485         return bucket_renewal_secret_hash(self.file_renewal_secret, seed)
486     def _get_cancel_secret(self, seed):
487         return bucket_cancel_secret_hash(self.file_cancel_secret, seed)
488
489     def _get_buckets(self, s, storageindex):
490         """Return a deferred that eventually fires with ({sharenum: bucket},
491         serverid, success). In case the server is disconnected or returns a
492         Failure then it fires with ({}, serverid, False) (A server
493         disconnecting or returning a Failure when we ask it for buckets is
494         the same, for our purposes, as a server that says it has none, except
495         that we want to track and report whether or not each server
496         responded.)"""
497
498         rref = s.get_rref()
499         lease_seed = s.get_lease_seed()
500         serverid = s.get_serverid()
501         if self._add_lease:
502             renew_secret = self._get_renewal_secret(lease_seed)
503             cancel_secret = self._get_cancel_secret(lease_seed)
504             d2 = rref.callRemote("add_lease", storageindex,
505                                  renew_secret, cancel_secret)
506             d2.addErrback(self._add_lease_failed, s.name(), storageindex)
507
508         d = rref.callRemote("get_buckets", storageindex)
509         def _wrap_results(res):
510             return (res, serverid, True)
511
512         def _trap_errs(f):
513             level = log.WEIRD
514             if f.check(DeadReferenceError):
515                 level = log.UNUSUAL
516             self.log("failure from server on 'get_buckets' the REMOTE failure was:",
517                      facility="tahoe.immutable.checker",
518                      failure=f, level=level, umid="AX7wZQ")
519             return ({}, serverid, False)
520
521         d.addCallbacks(_wrap_results, _trap_errs)
522         return d
523
524     def _add_lease_failed(self, f, server_name, storage_index):
525         # Older versions of Tahoe didn't handle the add-lease message very
526         # well: <=1.1.0 throws a NameError because it doesn't implement
527         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
528         # (which is most of them, since we send add-lease to everybody,
529         # before we know whether or not they have any shares for us), and
530         # 1.2.0 throws KeyError even on known buckets due to an internal bug
531         # in the latency-measuring code.
532
533         # we want to ignore the known-harmless errors and log the others. In
534         # particular we want to log any local errors caused by coding
535         # problems.
536
537         if f.check(DeadReferenceError):
538             return
539         if f.check(RemoteException):
540             if f.value.failure.check(KeyError, IndexError, NameError):
541                 # this may ignore a bit too much, but that only hurts us
542                 # during debugging
543                 return
544             self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
545                      name=server_name,
546                      f_value=str(f.value),
547                      failure=f,
548                      level=log.WEIRD, umid="atbAxw")
549             return
550         # local errors are cause for alarm
551         log.err(f,
552                 format="local error in add_lease to [%(name)s]: %(f_value)s",
553                 name=server_name,
554                 f_value=str(f.value),
555                 level=log.WEIRD, umid="hEGuQg")
556
557
558     def _download_and_verify(self, serverid, sharenum, bucket):
559         """Start an attempt to download and verify every block in this bucket
560         and return a deferred that will eventually fire once the attempt
561         completes.
562
563         If you download and verify every block then fire with (True,
564         sharenum, None), else if the share data couldn't be parsed because it
565         was of an unknown version number fire with (False, sharenum,
566         'incompatible'), else if any of the blocks were invalid, fire with
567         (False, sharenum, 'corrupt'), else if the server disconnected (False,
568         sharenum, 'disconnect'), else if the server returned a Failure during
569         the process fire with (False, sharenum, 'failure').
570
571         If there is an internal error such as an uncaught exception in this
572         code, then the deferred will errback, but if there is a remote error
573         such as the server failing or the returned data being incorrect then
574         it will not errback -- it will fire normally with the indicated
575         results."""
576
577         vcap = self._verifycap
578         b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index())
579         veup = ValidatedExtendedURIProxy(b, vcap)
580         d = veup.start()
581
582         def _got_ueb(vup):
583             share_hash_tree = IncompleteHashTree(vcap.total_shares)
584             share_hash_tree.set_hashes({0: vup.share_root_hash})
585
586             vrbp = ValidatedReadBucketProxy(sharenum, b,
587                                             share_hash_tree,
588                                             vup.num_segments,
589                                             vup.block_size,
590                                             vup.share_size)
591
592             # note: normal download doesn't use get_all_sharehashes(),
593             # because it gets more data than necessary. We've discussed the
594             # security properties of having verification and download look
595             # identical (so the server couldn't, say, provide good responses
596             # for one and not the other), but I think that full verification
597             # is more important than defending against inconsistent server
598             # behavior. Besides, they can't pass the verifier without storing
599             # all the data, so there's not so much to be gained by behaving
600             # inconsistently.
601             d = vrbp.get_all_sharehashes()
602             # we fill share_hash_tree before fetching any blocks, so the
603             # block fetches won't send redundant share-hash-tree requests, to
604             # speed things up. Then we fetch+validate all the blockhashes.
605             d.addCallback(lambda ign: vrbp.get_all_blockhashes())
606
607             cht = IncompleteHashTree(vup.num_segments)
608             cht.set_hashes({0: vup.crypttext_root_hash})
609             d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
610
611             d.addCallback(lambda ign: vrbp)
612             return d
613         d.addCallback(_got_ueb)
614
615         def _discard_result(r):
616             assert isinstance(r, str), r
617             # to free up the RAM
618             return None
619         def _get_blocks(vrbp):
620             ds = []
621             for blocknum in range(veup.num_segments):
622                 db = vrbp.get_block(blocknum)
623                 db.addCallback(_discard_result)
624                 ds.append(db)
625             # this gatherResults will fire once every block of this share has
626             # been downloaded and verified, or else it will errback.
627             return deferredutil.gatherResults(ds)
628         d.addCallback(_get_blocks)
629
630         # if none of those errbacked, the blocks (and the hashes above them)
631         # are good
632         def _all_good(ign):
633             return (True, sharenum, None)
634         d.addCallback(_all_good)
635
636         # but if anything fails, we'll land here
637         def _errb(f):
638             # We didn't succeed at fetching and verifying all the blocks of
639             # this share. Handle each reason for failure differently.
640
641             if f.check(DeadReferenceError):
642                 return (False, sharenum, 'disconnect')
643             elif f.check(RemoteException):
644                 return (False, sharenum, 'failure')
645             elif f.check(layout.ShareVersionIncompatible):
646                 return (False, sharenum, 'incompatible')
647             elif f.check(layout.LayoutInvalid,
648                          layout.RidiculouslyLargeURIExtensionBlock,
649                          BadOrMissingHash,
650                          BadURIExtensionHashValue):
651                 return (False, sharenum, 'corrupt')
652
653             # if it wasn't one of those reasons, re-raise the error
654             return f
655         d.addErrback(_errb)
656
657         return d
658
659     def _verify_server_shares(self, s):
660         """ Return a deferred which eventually fires with a tuple of
661         (set(sharenum), serverid, set(corruptsharenum),
662         set(incompatiblesharenum), success) showing all the shares verified
663         to be served by this server, and all the corrupt shares served by the
664         server, and all the incompatible shares served by the server. In case
665         the server is disconnected or returns a Failure then it fires with
666         the last element False.
667
668         A server disconnecting or returning a failure when we ask it for
669         shares is the same, for our purposes, as a server that says it has
670         none or offers invalid ones, except that we want to track and report
671         the server's behavior. Similarly, the presence of corrupt shares is
672         mainly of use for diagnostics -- you can typically treat it as just
673         like being no share at all by just observing its absence from the
674         verified shares dict and ignoring its presence in the corrupt shares
675         dict.
676
677         The 'success' argument means whether the server responded to *any*
678         queries during this process, so if it responded to some queries and
679         then disconnected and ceased responding, or returned a failure, it is
680         still marked with the True flag for 'success'.
681         """
682         d = self._get_buckets(s, self._verifycap.get_storage_index())
683
684         def _got_buckets(result):
685             bucketdict, serverid, success = result
686
687             shareverds = []
688             for (sharenum, bucket) in bucketdict.items():
689                 d = self._download_and_verify(serverid, sharenum, bucket)
690                 shareverds.append(d)
691
692             dl = deferredutil.gatherResults(shareverds)
693
694             def collect(results):
695                 verified = set()
696                 corrupt = set()
697                 incompatible = set()
698                 for succ, sharenum, whynot in results:
699                     if succ:
700                         verified.add(sharenum)
701                     else:
702                         if whynot == 'corrupt':
703                             corrupt.add(sharenum)
704                         elif whynot == 'incompatible':
705                             incompatible.add(sharenum)
706                 return (verified, serverid, corrupt, incompatible, success)
707
708             dl.addCallback(collect)
709             return dl
710
711         def _err(f):
712             f.trap(RemoteException, DeadReferenceError)
713             return (set(), s.get_serverid(), set(), set(), False)
714
715         d.addCallbacks(_got_buckets, _err)
716         return d
717
718     def _check_server_shares(self, s):
719         """Return a deferred which eventually fires with a tuple of
720         (set(sharenum), serverid, set(), set(), responded) showing all the
721         shares claimed to be served by this server. In case the server is
722         disconnected then it fires with (set() serverid, set(), set(), False)
723         (a server disconnecting when we ask it for buckets is the same, for
724         our purposes, as a server that says it has none, except that we want
725         to track and report whether or not each server responded.)"""
726         def _curry_empty_corrupted(res):
727             buckets, serverid, responded = res
728             return (set(buckets), serverid, set(), set(), responded)
729         d = self._get_buckets(s, self._verifycap.get_storage_index())
730         d.addCallback(_curry_empty_corrupted)
731         return d
732
733     def _format_results(self, results):
734         cr = CheckResults(self._verifycap, self._verifycap.get_storage_index())
735         d = {}
736         d['count-shares-needed'] = self._verifycap.needed_shares
737         d['count-shares-expected'] = self._verifycap.total_shares
738
739         verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
740         servers = {} # {serverid: set(sharenums)}
741         corruptsharelocators = [] # (serverid, storageindex, sharenum)
742         incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
743
744         for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
745             servers.setdefault(thisserverid, set()).update(theseverifiedshares)
746             for sharenum in theseverifiedshares:
747                 verifiedshares.setdefault(sharenum, set()).add(thisserverid)
748             for sharenum in thesecorruptshares:
749                 corruptsharelocators.append((thisserverid, self._verifycap.get_storage_index(), sharenum))
750             for sharenum in theseincompatibleshares:
751                 incompatiblesharelocators.append((thisserverid, self._verifycap.get_storage_index(), sharenum))
752
753         d['count-shares-good'] = len(verifiedshares)
754         d['count-good-share-hosts'] = len([s for s in servers.keys() if servers[s]])
755
756         assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
757         if len(verifiedshares) == self._verifycap.total_shares:
758             cr.set_healthy(True)
759             cr.set_summary("Healthy")
760         else:
761             cr.set_healthy(False)
762             cr.set_summary("Not Healthy: %d shares (enc %d-of-%d)" %
763                            (len(verifiedshares),
764                             self._verifycap.needed_shares,
765                             self._verifycap.total_shares))
766         if len(verifiedshares) >= self._verifycap.needed_shares:
767             cr.set_recoverable(True)
768             d['count-recoverable-versions'] = 1
769             d['count-unrecoverable-versions'] = 0
770         else:
771             cr.set_recoverable(False)
772             d['count-recoverable-versions'] = 0
773             d['count-unrecoverable-versions'] = 1
774
775         d['servers-responding'] = list(servers)
776         d['sharemap'] = verifiedshares
777         # no such thing as wrong shares of an immutable file
778         d['count-wrong-shares'] = 0
779         d['list-corrupt-shares'] = corruptsharelocators
780         d['count-corrupt-shares'] = len(corruptsharelocators)
781         d['list-incompatible-shares'] = incompatiblesharelocators
782         d['count-incompatible-shares'] = len(incompatiblesharelocators)
783
784
785         # The file needs rebalancing if the set of servers that have at least
786         # one share is less than the number of uniquely-numbered shares
787         # available.
788         cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
789
790         cr.set_data(d)
791
792         return cr
793
794     def start(self):
795         ds = []
796         if self._verify:
797             for s in self._servers:
798                 ds.append(self._verify_server_shares(s))
799         else:
800             for s in self._servers:
801                 ds.append(self._check_server_shares(s))
802
803         return deferredutil.gatherResults(ds).addCallback(self._format_results)