1 import random, weakref, itertools, time
2 from zope.interface import implements
3 from twisted.internet import defer
4 from twisted.internet.interfaces import IPushProducer, IConsumer
5 from foolscap.api import DeadReferenceError, RemoteException, eventually
7 from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
8 from allmydata.util.assertutil import _assert, precondition
9 from allmydata import codec, hashtree, uri
10 from allmydata.interfaces import IDownloadTarget, IDownloader, IVerifierURI, \
11 IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
12 IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
13 UnableToFetchCriticalDownloadDataError
14 from allmydata.immutable import layout
15 from allmydata.monitor import Monitor
16 from pycryptopp.cipher.aes import AES
18 class IntegrityCheckReject(Exception):
21 class BadURIExtensionHashValue(IntegrityCheckReject):
23 class BadURIExtension(IntegrityCheckReject):
25 class UnsupportedErasureCodec(BadURIExtension):
27 class BadCrypttextHashValue(IntegrityCheckReject):
29 class BadOrMissingHash(IntegrityCheckReject):
32 class DownloadStopped(Exception):
35 class DownloadResults:
36 implements(IDownloadResults)
39 self.servers_used = set()
40 self.server_problems = {}
45 class DecryptingTarget(log.PrefixingLogMixin):
46 implements(IDownloadTarget, IConsumer)
47 def __init__(self, target, key, _log_msg_id=None):
48 precondition(IDownloadTarget.providedBy(target), target)
50 self._decryptor = AES(key)
52 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
53 # methods to satisfy the IConsumer interface
54 def registerProducer(self, producer, streaming):
55 if IConsumer.providedBy(self.target):
56 self.target.registerProducer(producer, streaming)
57 def unregisterProducer(self):
58 if IConsumer.providedBy(self.target):
59 self.target.unregisterProducer()
60 def write(self, ciphertext):
61 plaintext = self._decryptor.process(ciphertext)
62 self.target.write(plaintext)
64 self.target.open(size)
68 return self.target.finish()
69 # The following methods is just to pass through to the next target, and
70 # just because that target might be a repairer.DownUpConnector, and just
71 # because the current CHKUpload object expects to find the storage index
73 def set_storageindex(self, storageindex):
74 self.target.set_storageindex(storageindex)
75 def set_encodingparams(self, encodingparams):
76 self.target.set_encodingparams(encodingparams)
78 class ValidatedThingObtainer:
79 def __init__(self, validatedthingproxies, debugname, log_id):
80 self._validatedthingproxies = validatedthingproxies
81 self._debugname = debugname
84 def _bad(self, f, validatedthingproxy):
85 failtype = f.trap(RemoteException, DeadReferenceError,
86 IntegrityCheckReject, layout.LayoutInvalid,
87 layout.ShareVersionIncompatible)
89 if f.check(DeadReferenceError):
91 elif f.check(RemoteException):
95 log.msg(parent=self._log_id, facility="tahoe.immutable.download",
96 format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
97 op=self._debugname, validatedthingproxy=str(validatedthingproxy),
98 failure=f, level=level, umid="JGXxBA")
99 if not self._validatedthingproxies:
100 raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
101 # try again with a different one
102 d = self._try_the_next_one()
105 def _try_the_next_one(self):
106 vtp = self._validatedthingproxies.pop(0)
107 # start() obtains, validates, and callsback-with the thing or else
110 d.addErrback(self._bad, vtp)
114 return self._try_the_next_one()
116 class ValidatedCrypttextHashTreeProxy:
117 implements(IValidatedThingProxy)
118 """ I am a front-end for a remote crypttext hash tree using a local
119 ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the
120 Validated Thing protocol (i.e., I have a start() method that fires with
121 self once I get a valid one)."""
122 def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments,
123 fetch_failures=None):
124 # fetch_failures is for debugging -- see test_encode.py
125 self._readbucketproxy = readbucketproxy
126 self._num_segments = num_segments
127 self._fetch_failures = fetch_failures
128 self._crypttext_hash_tree = crypttext_hash_tree
130 def _validate(self, proposal):
131 ct_hashes = dict(list(enumerate(proposal)))
133 self._crypttext_hash_tree.set_hashes(ct_hashes)
134 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
135 if self._fetch_failures is not None:
136 self._fetch_failures["crypttext_hash_tree"] += 1
137 raise BadOrMissingHash(le)
138 # If we now have enough of the crypttext hash tree to integrity-check
139 # *any* segment of ciphertext, then we are done. TODO: It would have
140 # better alacrity if we downloaded only part of the crypttext hash
142 for segnum in range(self._num_segments):
143 if self._crypttext_hash_tree.needed_hashes(segnum):
144 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
148 d = self._readbucketproxy.get_crypttext_hashes()
149 d.addCallback(self._validate)
152 class ValidatedExtendedURIProxy:
153 implements(IValidatedThingProxy)
154 """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
155 responsible for retrieving and validating the elements from the UEB."""
157 def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
158 # fetch_failures is for debugging -- see test_encode.py
159 self._fetch_failures = fetch_failures
160 self._readbucketproxy = readbucketproxy
161 precondition(IVerifierURI.providedBy(verifycap), verifycap)
162 self._verifycap = verifycap
165 self.segment_size = None
166 self.crypttext_root_hash = None
167 self.share_root_hash = None
170 self.block_size = None
171 self.share_size = None
172 self.num_segments = None
173 self.tail_data_size = None
174 self.tail_segment_size = None
177 self.crypttext_hash = None
180 return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
182 def _check_integrity(self, data):
183 h = hashutil.uri_extension_hash(data)
184 if h != self._verifycap.uri_extension_hash:
185 msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
186 (self._readbucketproxy,
187 base32.b2a(self._verifycap.uri_extension_hash),
189 if self._fetch_failures is not None:
190 self._fetch_failures["uri_extension"] += 1
191 raise BadURIExtensionHashValue(msg)
195 def _parse_and_validate(self, data):
196 self.share_size = mathutil.div_ceil(self._verifycap.size,
197 self._verifycap.needed_shares)
199 d = uri.unpack_extension(data)
201 # There are several kinds of things that can be found in a UEB.
202 # First, things that we really need to learn from the UEB in order to
203 # do this download. Next: things which are optional but not redundant
204 # -- if they are present in the UEB they will get used. Next, things
205 # that are optional and redundant. These things are required to be
206 # consistent: they don't have to be in the UEB, but if they are in
207 # the UEB then they will be checked for consistency with the
208 # already-known facts, and if they are inconsistent then an exception
209 # will be raised. These things aren't actually used -- they are just
210 # tested for consistency and ignored. Finally: things which are
211 # deprecated -- they ought not be in the UEB at all, and if they are
212 # present then a warning will be logged but they are otherwise
215 # First, things that we really need to learn from the UEB:
216 # segment_size, crypttext_root_hash, and share_root_hash.
217 self.segment_size = d['segment_size']
219 self.block_size = mathutil.div_ceil(self.segment_size,
220 self._verifycap.needed_shares)
221 self.num_segments = mathutil.div_ceil(self._verifycap.size,
224 self.tail_data_size = self._verifycap.size % self.segment_size
225 if not self.tail_data_size:
226 self.tail_data_size = self.segment_size
227 # padding for erasure code
228 self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
229 self._verifycap.needed_shares)
231 # Ciphertext hash tree root is mandatory, so that there is at most
232 # one ciphertext that matches this read-cap or verify-cap. The
233 # integrity check on the shares is not sufficient to prevent the
234 # original encoder from creating some shares of file A and other
236 self.crypttext_root_hash = d['crypttext_root_hash']
238 self.share_root_hash = d['share_root_hash']
241 # Next: things that are optional and not redundant: crypttext_hash
242 if d.has_key('crypttext_hash'):
243 self.crypttext_hash = d['crypttext_hash']
244 if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
245 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
248 # Next: things that are optional, redundant, and required to be
249 # consistent: codec_name, codec_params, tail_codec_params,
250 # num_segments, size, needed_shares, total_shares
251 if d.has_key('codec_name'):
252 if d['codec_name'] != "crs":
253 raise UnsupportedErasureCodec(d['codec_name'])
255 if d.has_key('codec_params'):
256 ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
257 if ucpss != self.segment_size:
258 raise BadURIExtension("inconsistent erasure code params: "
259 "ucpss: %s != self.segment_size: %s" %
260 (ucpss, self.segment_size))
261 if ucpns != self._verifycap.needed_shares:
262 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
263 "self._verifycap.needed_shares: %s" %
264 (ucpns, self._verifycap.needed_shares))
265 if ucpts != self._verifycap.total_shares:
266 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
267 "self._verifycap.total_shares: %s" %
268 (ucpts, self._verifycap.total_shares))
270 if d.has_key('tail_codec_params'):
271 utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
272 if utcpss != self.tail_segment_size:
273 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
274 "self.tail_segment_size: %s, self._verifycap.size: %s, "
275 "self.segment_size: %s, self._verifycap.needed_shares: %s"
276 % (utcpss, self.tail_segment_size, self._verifycap.size,
277 self.segment_size, self._verifycap.needed_shares))
278 if utcpns != self._verifycap.needed_shares:
279 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
280 "self._verifycap.needed_shares: %s" % (utcpns,
281 self._verifycap.needed_shares))
282 if utcpts != self._verifycap.total_shares:
283 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
284 "self._verifycap.total_shares: %s" % (utcpts,
285 self._verifycap.total_shares))
287 if d.has_key('num_segments'):
288 if d['num_segments'] != self.num_segments:
289 raise BadURIExtension("inconsistent num_segments: size: %s, "
290 "segment_size: %s, computed_num_segments: %s, "
291 "ueb_num_segments: %s" % (self._verifycap.size,
293 self.num_segments, d['num_segments']))
295 if d.has_key('size'):
296 if d['size'] != self._verifycap.size:
297 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
298 (self._verifycap.size, d['size']))
300 if d.has_key('needed_shares'):
301 if d['needed_shares'] != self._verifycap.needed_shares:
302 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
303 "needed shares: %s" % (self._verifycap.total_shares,
306 if d.has_key('total_shares'):
307 if d['total_shares'] != self._verifycap.total_shares:
308 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
309 "total shares: %s" % (self._verifycap.total_shares,
312 # Finally, things that are deprecated and ignored: plaintext_hash,
313 # plaintext_root_hash
314 if d.get('plaintext_hash'):
315 log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
316 "and is no longer used. Ignoring. %s" % (self,))
317 if d.get('plaintext_root_hash'):
318 log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
319 "reasons and is no longer used. Ignoring. %s" % (self,))
324 """Fetch the UEB from bucket, compare its hash to the hash from
325 verifycap, then parse it. Returns a deferred which is called back
326 with self once the fetch is successful, or is erred back if it
328 d = self._readbucketproxy.get_uri_extension()
329 d.addCallback(self._check_integrity)
330 d.addCallback(self._parse_and_validate)
333 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
334 """I am a front-end for a remote storage bucket, responsible for
335 retrieving and validating data from that bucket.
337 My get_block() method is used by BlockDownloaders.
340 def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
341 block_size, share_size):
342 """ share_hash_tree is required to have already been initialized with
343 the root hash (the number-0 hash), using the share_root_hash from the
345 precondition(share_hash_tree[0] is not None, share_hash_tree)
346 prefix = "%d-%s-%s" % (sharenum, bucket,
347 base32.b2a_l(share_hash_tree[0][:8], 60))
348 log.PrefixingLogMixin.__init__(self,
349 facility="tahoe.immutable.download",
351 self.sharenum = sharenum
353 self.share_hash_tree = share_hash_tree
354 self.num_blocks = num_blocks
355 self.block_size = block_size
356 self.share_size = share_size
357 self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
359 def get_all_sharehashes(self):
360 """Retrieve and validate all the share-hash-tree nodes that are
361 included in this share, regardless of whether we need them to
362 validate the share or not. Each share contains a minimal Merkle tree
363 chain, but there is lots of overlap, so usually we'll be using hashes
364 from other shares and not reading every single hash from this share.
365 The Verifier uses this function to read and validate every single
366 hash from this share.
368 Call this (and wait for the Deferred it returns to fire) before
369 calling get_block() for the first time: this lets us check that the
370 share share contains enough hashes to validate its own data, and
371 avoids downloading any share hash twice.
373 I return a Deferred which errbacks upon failure, probably with
376 d = self.bucket.get_share_hashes()
377 def _got_share_hashes(sh):
378 sharehashes = dict(sh)
380 self.share_hash_tree.set_hashes(sharehashes)
381 except IndexError, le:
382 raise BadOrMissingHash(le)
383 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
384 raise BadOrMissingHash(le)
385 d.addCallback(_got_share_hashes)
388 def get_all_blockhashes(self):
389 """Retrieve and validate all the block-hash-tree nodes that are
390 included in this share. Each share contains a full Merkle tree, but
391 we usually only fetch the minimal subset necessary for any particular
392 block. This function fetches everything at once. The Verifier uses
393 this function to validate the block hash tree.
395 Call this (and wait for the Deferred it returns to fire) after
396 calling get_all_sharehashes() and before calling get_block() for the
397 first time: this lets us check that the share contains all block
398 hashes and avoids downloading them multiple times.
400 I return a Deferred which errbacks upon failure, probably with
404 # get_block_hashes(anything) currently always returns everything
405 needed = list(range(len(self.block_hash_tree)))
406 d = self.bucket.get_block_hashes(needed)
407 def _got_block_hashes(blockhashes):
408 if len(blockhashes) < len(self.block_hash_tree):
409 raise BadOrMissingHash()
410 bh = dict(enumerate(blockhashes))
413 self.block_hash_tree.set_hashes(bh)
414 except IndexError, le:
415 raise BadOrMissingHash(le)
416 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
417 raise BadOrMissingHash(le)
418 d.addCallback(_got_block_hashes)
421 def get_all_crypttext_hashes(self, crypttext_hash_tree):
422 """Retrieve and validate all the crypttext-hash-tree nodes that are
423 in this share. Normally we don't look at these at all: the download
424 process fetches them incrementally as needed to validate each segment
425 of ciphertext. But this is a convenient place to give the Verifier a
426 function to validate all of these at once.
428 Call this with a new hashtree object for each share, initialized with
429 the crypttext hash tree root. I return a Deferred which errbacks upon
430 failure, probably with BadOrMissingHash.
433 # get_crypttext_hashes() always returns everything
434 d = self.bucket.get_crypttext_hashes()
435 def _got_crypttext_hashes(hashes):
436 if len(hashes) < len(crypttext_hash_tree):
437 raise BadOrMissingHash()
438 ct_hashes = dict(enumerate(hashes))
440 crypttext_hash_tree.set_hashes(ct_hashes)
441 except IndexError, le:
442 raise BadOrMissingHash(le)
443 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
444 raise BadOrMissingHash(le)
445 d.addCallback(_got_crypttext_hashes)
448 def get_block(self, blocknum):
449 # the first time we use this bucket, we need to fetch enough elements
450 # of the share hash tree to validate it from our share hash up to the
452 if self.share_hash_tree.needed_hashes(self.sharenum):
453 d1 = self.bucket.get_share_hashes()
455 d1 = defer.succeed([])
457 # We might need to grab some elements of our block hash tree, to
458 # validate the requested block up to the share hash.
459 blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
460 # We don't need the root of the block hash tree, as that comes in the
462 blockhashesneeded.discard(0)
463 d2 = self.bucket.get_block_hashes(blockhashesneeded)
465 if blocknum < self.num_blocks-1:
466 thisblocksize = self.block_size
468 thisblocksize = self.share_size % self.block_size
469 if thisblocksize == 0:
470 thisblocksize = self.block_size
471 d3 = self.bucket.get_block_data(blocknum,
472 self.block_size, thisblocksize)
474 dl = deferredutil.gatherResults([d1, d2, d3])
475 dl.addCallback(self._got_data, blocknum)
478 def _got_data(self, results, blocknum):
479 precondition(blocknum < self.num_blocks,
480 self, blocknum, self.num_blocks)
481 sharehashes, blockhashes, blockdata = results
483 sharehashes = dict(sharehashes)
484 except ValueError, le:
485 le.args = tuple(le.args + (sharehashes,))
487 blockhashes = dict(enumerate(blockhashes))
489 candidate_share_hash = None # in case we log it in the except block below
490 blockhash = None # in case we log it in the except block below
493 if self.share_hash_tree.needed_hashes(self.sharenum):
494 # This will raise exception if the values being passed do not
495 # match the root node of self.share_hash_tree.
497 self.share_hash_tree.set_hashes(sharehashes)
498 except IndexError, le:
499 # Weird -- sharehashes contained index numbers outside of
500 # the range that fit into this hash tree.
501 raise BadOrMissingHash(le)
503 # To validate a block we need the root of the block hash tree,
504 # which is also one of the leafs of the share hash tree, and is
505 # called "the share hash".
506 if not self.block_hash_tree[0]: # empty -- no root node yet
507 # Get the share hash from the share hash tree.
508 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
510 # No root node in block_hash_tree and also the share hash
511 # wasn't sent by the server.
512 raise hashtree.NotEnoughHashesError
513 self.block_hash_tree.set_hashes({0: share_hash})
515 if self.block_hash_tree.needed_hashes(blocknum):
516 self.block_hash_tree.set_hashes(blockhashes)
518 blockhash = hashutil.block_hash(blockdata)
519 self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
520 #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
522 # (self.sharenum, blocknum, len(blockdata),
523 # blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
525 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
526 # log.WEIRD: indicates undetected disk/network error, or more
527 # likely a programming error
528 self.log("hash failure in block=%d, shnum=%d on %s" %
529 (blocknum, self.sharenum, self.bucket))
530 if self.block_hash_tree.needed_hashes(blocknum):
531 self.log(""" failure occurred when checking the block_hash_tree.
532 This suggests that either the block data was bad, or that the
533 block hashes we received along with it were bad.""")
535 self.log(""" the failure probably occurred when checking the
536 share_hash_tree, which suggests that the share hashes we
537 received from the remote peer were bad.""")
538 self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
539 self.log(" block length: %d" % len(blockdata))
540 self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
541 if len(blockdata) < 100:
542 self.log(" block data: %r" % (blockdata,))
544 self.log(" block data start/end: %r .. %r" %
545 (blockdata[:50], blockdata[-50:]))
546 self.log(" share hash tree:\n" + self.share_hash_tree.dump())
547 self.log(" block hash tree:\n" + self.block_hash_tree.dump())
549 for i,h in sorted(sharehashes.items()):
550 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
551 self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
553 for i,h in blockhashes.items():
554 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
555 log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
556 raise BadOrMissingHash(le)
558 # If we made it here, the block is good. If the hash trees didn't
559 # like what they saw, they would have raised a BadHashError, causing
560 # our caller to see a Failure and thus ignore this block (as well as
561 # dropping this bucket).
566 class BlockDownloader(log.PrefixingLogMixin):
567 """I am responsible for downloading a single block (from a single bucket)
568 for a single segment.
570 I am a child of the SegmentDownloader.
573 def __init__(self, vbucket, blocknum, parent, results):
574 precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
575 prefix = "%s-%d" % (vbucket, blocknum)
576 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
577 self.vbucket = vbucket
578 self.blocknum = blocknum
580 self.results = results
582 def start(self, segnum):
583 self.log("get_block(segnum=%d)" % segnum)
584 started = time.time()
585 d = self.vbucket.get_block(segnum)
586 d.addCallbacks(self._hold_block, self._got_block_error,
587 callbackArgs=(started,))
590 def _hold_block(self, data, started):
592 elapsed = time.time() - started
593 peerid = self.vbucket.bucket.get_peerid()
594 if peerid not in self.results.timings["fetch_per_server"]:
595 self.results.timings["fetch_per_server"][peerid] = []
596 self.results.timings["fetch_per_server"][peerid].append(elapsed)
597 self.log("got block")
598 self.parent.hold_block(self.blocknum, data)
600 def _got_block_error(self, f):
601 failtype = f.trap(RemoteException, DeadReferenceError,
602 IntegrityCheckReject,
603 layout.LayoutInvalid, layout.ShareVersionIncompatible)
604 if f.check(RemoteException, DeadReferenceError):
608 self.log("failure to get block", level=level, umid="5Z4uHQ")
610 peerid = self.vbucket.bucket.get_peerid()
611 self.results.server_problems[peerid] = str(f)
612 self.parent.bucket_failed(self.vbucket)
614 class SegmentDownloader:
615 """I am responsible for downloading all the blocks for a single segment
618 I am a child of the CiphertextDownloader.
621 def __init__(self, parent, segmentnumber, needed_shares, results):
623 self.segmentnumber = segmentnumber
624 self.needed_blocks = needed_shares
625 self.blocks = {} # k: blocknum, v: data
626 self.results = results
627 self._log_number = self.parent.log("starting segment %d" %
630 def log(self, *args, **kwargs):
631 if "parent" not in kwargs:
632 kwargs["parent"] = self._log_number
633 return self.parent.log(*args, **kwargs)
636 return self._download()
641 if len(self.blocks) >= self.needed_blocks:
642 # we only need self.needed_blocks blocks
643 # we want to get the smallest blockids, because they are
644 # more likely to be fast "primary blocks"
645 blockids = sorted(self.blocks.keys())[:self.needed_blocks]
647 for blocknum in blockids:
648 blocks.append(self.blocks[blocknum])
649 return (blocks, blockids)
651 return self._download()
656 # fill our set of active buckets, maybe raising NotEnoughSharesError
657 active_buckets = self.parent._activate_enough_buckets()
658 # Now we have enough buckets, in self.parent.active_buckets.
660 # in test cases, bd.start might mutate active_buckets right away, so
661 # we need to put off calling start() until we've iterated all the way
664 for blocknum, vbucket in active_buckets.iteritems():
665 assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
666 bd = BlockDownloader(vbucket, blocknum, self, self.results)
667 downloaders.append(bd)
669 self.results.servers_used.add(vbucket.bucket.get_peerid())
670 l = [bd.start(self.segmentnumber) for bd in downloaders]
671 return defer.DeferredList(l, fireOnOneErrback=True)
673 def hold_block(self, blocknum, data):
674 self.blocks[blocknum] = data
676 def bucket_failed(self, vbucket):
677 self.parent.bucket_failed(vbucket)
679 class DownloadStatus:
680 implements(IDownloadStatus)
681 statusid_counter = itertools.count(0)
684 self.storage_index = None
687 self.status = "Not started"
693 self.counter = self.statusid_counter.next()
694 self.started = time.time()
696 def get_started(self):
698 def get_storage_index(self):
699 return self.storage_index
702 def using_helper(self):
704 def get_status(self):
707 status += " (output paused)"
709 status += " (output stopped)"
711 def get_progress(self):
713 def get_active(self):
715 def get_results(self):
717 def get_counter(self):
720 def set_storage_index(self, si):
721 self.storage_index = si
722 def set_size(self, size):
724 def set_helper(self, helper):
726 def set_status(self, status):
728 def set_paused(self, paused):
730 def set_stopped(self, stopped):
731 self.stopped = stopped
732 def set_progress(self, value):
733 self.progress = value
734 def set_active(self, value):
736 def set_results(self, value):
739 class CiphertextDownloader(log.PrefixingLogMixin):
740 """ I download shares, check their integrity, then decode them, check the
741 integrity of the resulting ciphertext, then and write it to my target.
742 Before I send any new request to a server, I always ask the 'monitor'
743 object that was passed into my constructor whether this task has been
744 cancelled (by invoking its raise_if_cancelled() method)."""
745 implements(IPushProducer)
748 def __init__(self, storage_broker, v, target, monitor):
750 precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
751 precondition(IVerifierURI.providedBy(v), v)
752 precondition(IDownloadTarget.providedBy(target), target)
754 prefix=base32.b2a_l(v.storage_index[:8], 60)
755 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
756 self._storage_broker = storage_broker
759 self._storage_index = v.storage_index
760 self._uri_extension_hash = v.uri_extension_hash
762 self._started = time.time()
763 self._status = s = DownloadStatus()
764 s.set_status("Starting")
765 s.set_storage_index(self._storage_index)
766 s.set_size(self._verifycap.size)
770 self._results = DownloadResults()
771 s.set_results(self._results)
772 self._results.file_size = self._verifycap.size
773 self._results.timings["servers_peer_selection"] = {}
774 self._results.timings["fetch_per_server"] = {}
775 self._results.timings["cumulative_fetch"] = 0.0
776 self._results.timings["cumulative_decode"] = 0.0
777 self._results.timings["cumulative_decrypt"] = 0.0
778 self._results.timings["paused"] = 0.0
781 self._stopped = False
782 if IConsumer.providedBy(target):
783 target.registerProducer(self, True)
784 self._target = target
785 # Repairer (uploader) needs the storageindex.
786 self._target.set_storageindex(self._storage_index)
787 self._monitor = monitor
790 self.active_buckets = {} # k: shnum, v: bucket
791 self._share_buckets = [] # list of (sharenum, bucket) tuples
792 self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
794 self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
796 self._ciphertext_hasher = hashutil.crypttext_hasher()
799 self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
801 # _got_uri_extension() will create the following:
802 # self._crypttext_hash_tree
803 # self._share_hash_tree
804 # self._current_segnum = 0
805 # self._vup # ValidatedExtendedURIProxy
807 def pauseProducing(self):
810 self._paused = defer.Deferred()
811 self._paused_at = time.time()
813 self._status.set_paused(True)
815 def resumeProducing(self):
817 paused_for = time.time() - self._paused_at
818 self._results.timings['paused'] += paused_for
821 eventually(p.callback, None)
823 self._status.set_paused(False)
825 def stopProducing(self):
826 self.log("Download.stopProducing")
828 self.resumeProducing()
830 self._status.set_stopped(True)
831 self._status.set_active(False)
834 self.log("starting download")
836 # first step: who should we download from?
837 d = defer.maybeDeferred(self._get_all_shareholders)
838 d.addCallback(self._got_all_shareholders)
839 # now get the uri_extension block from somebody and integrity check
840 # it and parse and validate its contents
841 d.addCallback(self._obtain_uri_extension)
842 d.addCallback(self._get_crypttext_hash_tree)
843 # once we know that, we can download blocks from everybody
844 d.addCallback(self._download_all_segments)
847 self._status.set_status("Finished")
848 self._status.set_active(False)
849 self._status.set_paused(False)
850 if IConsumer.providedBy(self._target):
851 self._target.unregisterProducer()
856 self._status.set_status("Failed")
857 self._status.set_active(False)
858 if why.check(DownloadStopped):
859 # DownloadStopped just means the consumer aborted the
860 # download; not so scary.
861 self.log("download stopped", level=log.UNUSUAL)
863 # This is really unusual, and deserves maximum forensics.
864 self.log("download failed!", failure=why, level=log.SCARY,
867 d.addErrback(_failed)
868 d.addCallback(self._done)
871 def _get_all_shareholders(self):
873 sb = self._storage_broker
874 servers = sb.get_servers_for_index(self._storage_index)
876 raise NoServersError("broker gave us no servers!")
877 for (peerid,ss) in servers:
878 self.log(format="sending DYHB to [%(peerid)s]",
879 peerid=idlib.shortnodeid_b2a(peerid),
880 level=log.NOISY, umid="rT03hg")
881 d = ss.callRemote("get_buckets", self._storage_index)
882 d.addCallbacks(self._got_response, self._got_error,
883 callbackArgs=(peerid,))
885 self._responses_received = 0
886 self._queries_sent = len(dl)
888 self._status.set_status("Locating Shares (%d/%d)" %
889 (self._responses_received,
891 return defer.DeferredList(dl)
893 def _got_response(self, buckets, peerid):
894 self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
895 peerid=idlib.shortnodeid_b2a(peerid),
896 shnums=sorted(buckets.keys()),
897 level=log.NOISY, umid="o4uwFg")
898 self._responses_received += 1
900 elapsed = time.time() - self._started
901 self._results.timings["servers_peer_selection"][peerid] = elapsed
903 self._status.set_status("Locating Shares (%d/%d)" %
904 (self._responses_received,
906 for sharenum, bucket in buckets.iteritems():
907 b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
908 self.add_share_bucket(sharenum, b)
911 if peerid not in self._results.servermap:
912 self._results.servermap[peerid] = set()
913 self._results.servermap[peerid].add(sharenum)
915 def add_share_bucket(self, sharenum, bucket):
916 # this is split out for the benefit of test_encode.py
917 self._share_buckets.append( (sharenum, bucket) )
919 def _got_error(self, f):
921 if f.check(DeadReferenceError):
923 self.log("Error during get_buckets", failure=f, level=level,
926 def bucket_failed(self, vbucket):
927 shnum = vbucket.sharenum
928 del self.active_buckets[shnum]
929 s = self._share_vbuckets[shnum]
930 # s is a set of ValidatedReadBucketProxy instances
932 # ... which might now be empty
934 # there are no more buckets which can provide this share, so
935 # remove the key. This may prompt us to use a different share.
936 del self._share_vbuckets[shnum]
938 def _got_all_shareholders(self, res):
941 self._results.timings["peer_selection"] = now - self._started
943 if len(self._share_buckets) < self._verifycap.needed_shares:
944 msg = "Failed to get enough shareholders: have %d, need %d" \
945 % (len(self._share_buckets), self._verifycap.needed_shares)
946 if self._share_buckets:
947 raise NotEnoughSharesError(msg)
949 raise NoSharesError(msg)
951 #for s in self._share_vbuckets.values():
953 # assert isinstance(vb, ValidatedReadBucketProxy), \
954 # "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
956 def _obtain_uri_extension(self, ignored):
957 # all shareholders are supposed to have a copy of uri_extension, and
958 # all are supposed to be identical. We compute the hash of the data
959 # that comes back, and compare it against the version in our URI. If
960 # they don't match, ignore their data and try someone else.
962 self._status.set_status("Obtaining URI Extension")
964 uri_extension_fetch_started = time.time()
967 for sharenum, bucket in self._share_buckets:
968 vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
969 vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
972 def _got_uri_extension(vup):
973 precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
975 elapsed = time.time() - uri_extension_fetch_started
976 self._results.timings["uri_extension"] = elapsed
979 self._codec = codec.CRSDecoder()
980 self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
981 self._tail_codec = codec.CRSDecoder()
982 self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
984 self._current_segnum = 0
986 self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
987 self._share_hash_tree.set_hashes({0: vup.share_root_hash})
989 self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
990 self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
992 # Repairer (uploader) needs the encodingparams.
993 self._target.set_encodingparams((
994 self._verifycap.needed_shares,
995 self._verifycap.total_shares, # I don't think the target actually cares about "happy".
996 self._verifycap.total_shares,
997 self._vup.segment_size
999 d.addCallback(_got_uri_extension)
1002 def _get_crypttext_hash_tree(self, res):
1004 for sharenum, bucket in self._share_buckets:
1005 vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
1006 vchtps.append(vchtp)
1008 _get_crypttext_hash_tree_started = time.time()
1010 self._status.set_status("Retrieving crypttext hash tree")
1012 vto = ValidatedThingObtainer(vchtps, debugname="vchtps",
1013 log_id=self._parentmsgid)
1016 def _got_crypttext_hash_tree(res):
1017 # Good -- the self._crypttext_hash_tree that we passed to vchtp
1018 # is now populated with hashes.
1020 elapsed = time.time() - _get_crypttext_hash_tree_started
1021 self._results.timings["hashtrees"] = elapsed
1022 d.addCallback(_got_crypttext_hash_tree)
1025 def _activate_enough_buckets(self):
1026 """either return a mapping from shnum to a ValidatedReadBucketProxy
1027 that can provide data for that share, or raise NotEnoughSharesError"""
1029 while len(self.active_buckets) < self._verifycap.needed_shares:
1031 handled_shnums = set(self.active_buckets.keys())
1032 available_shnums = set(self._share_vbuckets.keys())
1033 potential_shnums = list(available_shnums - handled_shnums)
1034 if len(potential_shnums) < (self._verifycap.needed_shares
1035 - len(self.active_buckets)):
1036 have = len(potential_shnums) + len(self.active_buckets)
1037 msg = "Unable to activate enough shares: have %d, need %d" \
1038 % (have, self._verifycap.needed_shares)
1040 raise NotEnoughSharesError(msg)
1042 raise NoSharesError(msg)
1043 # For the next share, choose a primary share if available, else a
1044 # randomly chosen secondary share.
1045 potential_shnums.sort()
1046 if potential_shnums[0] < self._verifycap.needed_shares:
1047 shnum = potential_shnums[0]
1049 shnum = random.choice(potential_shnums)
1050 # and a random bucket that will provide it
1051 validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
1052 self.active_buckets[shnum] = validated_bucket
1053 return self.active_buckets
1056 def _download_all_segments(self, res):
1057 for sharenum, bucket in self._share_buckets:
1058 vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
1059 self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
1061 # after the above code, self._share_vbuckets contains enough
1062 # buckets to complete the download, and some extra ones to
1063 # tolerate some buckets dropping out or having
1064 # errors. self._share_vbuckets is a dictionary that maps from
1065 # shnum to a set of ValidatedBuckets, which themselves are
1066 # wrappers around RIBucketReader references.
1067 self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
1069 self._started_fetching = time.time()
1071 d = defer.succeed(None)
1072 for segnum in range(self._vup.num_segments):
1073 d.addCallback(self._download_segment, segnum)
1074 # this pause, at the end of write, prevents pre-fetch from
1075 # happening until the consumer is ready for more data.
1076 d.addCallback(self._check_for_pause)
1079 def _check_for_pause(self, res):
1081 d = defer.Deferred()
1082 self._paused.addCallback(lambda ignored: d.callback(res))
1085 raise DownloadStopped("our Consumer called stopProducing()")
1086 self._monitor.raise_if_cancelled()
1089 def _download_segment(self, res, segnum):
1091 self._status.set_status("Downloading segment %d of %d" %
1092 (segnum+1, self._vup.num_segments))
1093 self.log("downloading seg#%d of %d (%d%%)"
1094 % (segnum, self._vup.num_segments,
1095 100.0 * segnum / self._vup.num_segments))
1096 # memory footprint: when the SegmentDownloader finishes pulling down
1097 # all shares, we have 1*segment_size of usage.
1098 segmentdler = SegmentDownloader(self, segnum,
1099 self._verifycap.needed_shares,
1101 started = time.time()
1102 d = segmentdler.start()
1103 def _finished_fetching(res):
1104 elapsed = time.time() - started
1105 self._results.timings["cumulative_fetch"] += elapsed
1108 d.addCallback(_finished_fetching)
1109 # pause before using more memory
1110 d.addCallback(self._check_for_pause)
1111 # while the codec does its job, we hit 2*segment_size
1112 def _started_decode(res):
1113 self._started_decode = time.time()
1116 d.addCallback(_started_decode)
1117 if segnum + 1 == self._vup.num_segments:
1118 codec = self._tail_codec
1121 d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
1122 # once the codec is done, we drop back to 1*segment_size, because
1123 # 'shares' goes out of scope. The memory usage is all in the
1124 # plaintext now, spread out into a bunch of tiny buffers.
1125 def _finished_decode(res):
1126 elapsed = time.time() - self._started_decode
1127 self._results.timings["cumulative_decode"] += elapsed
1130 d.addCallback(_finished_decode)
1132 # pause/check-for-stop just before writing, to honor stopProducing
1133 d.addCallback(self._check_for_pause)
1134 d.addCallback(self._got_segment)
1137 def _got_segment(self, buffers):
1138 precondition(self._crypttext_hash_tree)
1139 started_decrypt = time.time()
1140 self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
1142 if self._current_segnum + 1 == self._vup.num_segments:
1143 # This is the last segment.
1144 # Trim off any padding added by the upload side. We never send
1145 # empty segments. If the data was an exact multiple of the
1146 # segment size, the last segment will be full.
1147 tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1148 num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1149 # Remove buffers which don't contain any part of the tail.
1150 del buffers[num_buffers_used:]
1151 # Remove the past-the-tail-part of the last buffer.
1152 tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1153 if tail_in_last_buf == 0:
1154 tail_in_last_buf = tail_buf_size
1155 buffers[-1] = buffers[-1][:tail_in_last_buf]
1157 # First compute the hash of this segment and check that it fits.
1158 ch = hashutil.crypttext_segment_hasher()
1159 for buffer in buffers:
1160 self._ciphertext_hasher.update(buffer)
1162 self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1164 # Then write this segment to the target.
1165 if not self._opened:
1167 self._target.open(self._verifycap.size)
1169 for buffer in buffers:
1170 self._target.write(buffer)
1171 self._bytes_done += len(buffer)
1173 self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1174 self._current_segnum += 1
1177 elapsed = time.time() - started_decrypt
1178 self._results.timings["cumulative_decrypt"] += elapsed
1180 def _done(self, res):
1181 self.log("download done")
1184 self._results.timings["total"] = now - self._started
1185 self._results.timings["segments"] = now - self._started_fetching
1186 if self._vup.crypttext_hash:
1187 _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1188 "bad crypttext_hash: computed=%s, expected=%s" %
1189 (base32.b2a(self._ciphertext_hasher.digest()),
1190 base32.b2a(self._vup.crypttext_hash)))
1191 _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1192 self._status.set_progress(1)
1193 self._target.close()
1194 return self._target.finish()
1195 def get_download_status(self):
1199 class ConsumerAdapter:
1200 implements(IDownloadTarget, IConsumer)
1201 def __init__(self, consumer):
1202 self._consumer = consumer
1204 def registerProducer(self, producer, streaming):
1205 self._consumer.registerProducer(producer, streaming)
1206 def unregisterProducer(self):
1207 self._consumer.unregisterProducer()
1209 def open(self, size):
1211 def write(self, data):
1212 self._consumer.write(data)
1216 def fail(self, why):
1218 def register_canceller(self, cb):
1221 return self._consumer
1222 # The following methods are just because the target might be a
1223 # repairer.DownUpConnector, and just because the current CHKUpload object
1224 # expects to find the storage index and encoding parameters in its
1226 def set_storageindex(self, storageindex):
1228 def set_encodingparams(self, encodingparams):
1233 """I am a service that allows file downloading.
1235 # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1236 # It is scheduled to go away, to be replaced by filenode.download()
1237 implements(IDownloader)
1239 def __init__(self, storage_broker, stats_provider):
1240 self.storage_broker = storage_broker
1241 self.stats_provider = stats_provider
1242 self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1244 def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1245 assert isinstance(u, uri.CHKFileURI)
1246 t = IDownloadTarget(t)
1250 if self.stats_provider:
1251 # these counters are meant for network traffic, and don't
1253 self.stats_provider.count('downloader.files_downloaded', 1)
1254 self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1256 target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1259 dl = CiphertextDownloader(self.storage_broker,
1260 u.get_verify_cap(), target,
1262 self._all_downloads[dl] = None
1264 history.add_download(dl.get_download_status())