1 import os, random, weakref, itertools, time
2 from zope.interface import implements
3 from twisted.internet import defer
4 from twisted.internet.interfaces import IPushProducer, IConsumer
5 from foolscap.api import DeadReferenceError, RemoteException, eventually
7 from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
8 from allmydata.util.assertutil import _assert, precondition
9 from allmydata import codec, hashtree, uri
10 from allmydata.interfaces import IDownloadTarget, IDownloader, \
11 IFileURI, IVerifierURI, \
12 IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
13 IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
14 UnableToFetchCriticalDownloadDataError
15 from allmydata.immutable import layout
16 from allmydata.monitor import Monitor
17 from pycryptopp.cipher.aes import AES
19 class IntegrityCheckReject(Exception):
22 class BadURIExtensionHashValue(IntegrityCheckReject):
24 class BadURIExtension(IntegrityCheckReject):
26 class UnsupportedErasureCodec(BadURIExtension):
28 class BadCrypttextHashValue(IntegrityCheckReject):
30 class BadOrMissingHash(IntegrityCheckReject):
33 class DownloadStopped(Exception):
36 class DownloadResults:
37 implements(IDownloadResults)
40 self.servers_used = set()
41 self.server_problems = {}
46 class DecryptingTarget(log.PrefixingLogMixin):
47 implements(IDownloadTarget, IConsumer)
48 def __init__(self, target, key, _log_msg_id=None):
49 precondition(IDownloadTarget.providedBy(target), target)
51 self._decryptor = AES(key)
53 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
54 # methods to satisfy the IConsumer interface
55 def registerProducer(self, producer, streaming):
56 if IConsumer.providedBy(self.target):
57 self.target.registerProducer(producer, streaming)
58 def unregisterProducer(self):
59 if IConsumer.providedBy(self.target):
60 self.target.unregisterProducer()
61 def write(self, ciphertext):
62 plaintext = self._decryptor.process(ciphertext)
63 self.target.write(plaintext)
65 self.target.open(size)
69 return self.target.finish()
70 # The following methods is just to pass through to the next target, and just because that
71 # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
72 # expects to find the storage index in its Uploadable.
73 def set_storageindex(self, storageindex):
74 self.target.set_storageindex(storageindex)
75 def set_encodingparams(self, encodingparams):
76 self.target.set_encodingparams(encodingparams)
78 class ValidatedThingObtainer:
79 def __init__(self, validatedthingproxies, debugname, log_id):
80 self._validatedthingproxies = validatedthingproxies
81 self._debugname = debugname
84 def _bad(self, f, validatedthingproxy):
85 failtype = f.trap(RemoteException, DeadReferenceError,
86 IntegrityCheckReject, layout.LayoutInvalid,
87 layout.ShareVersionIncompatible)
89 if f.check(DeadReferenceError):
91 elif f.check(RemoteException):
95 log.msg(parent=self._log_id, facility="tahoe.immutable.download",
96 format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
97 op=self._debugname, validatedthingproxy=str(validatedthingproxy),
98 failure=f, level=level, umid="JGXxBA")
99 if not self._validatedthingproxies:
100 raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
101 # try again with a different one
102 d = self._try_the_next_one()
105 def _try_the_next_one(self):
106 vtp = self._validatedthingproxies.pop(0)
107 d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
108 d.addErrback(self._bad, vtp)
112 return self._try_the_next_one()
114 class ValidatedCrypttextHashTreeProxy:
115 implements(IValidatedThingProxy)
116 """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
117 its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
118 start() method that fires with self once I get a valid one). """
119 def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
120 # fetch_failures is for debugging -- see test_encode.py
121 self._readbucketproxy = readbucketproxy
122 self._num_segments = num_segments
123 self._fetch_failures = fetch_failures
124 self._crypttext_hash_tree = crypttext_hash_tree
126 def _validate(self, proposal):
127 ct_hashes = dict(list(enumerate(proposal)))
129 self._crypttext_hash_tree.set_hashes(ct_hashes)
130 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
131 if self._fetch_failures is not None:
132 self._fetch_failures["crypttext_hash_tree"] += 1
133 raise BadOrMissingHash(le)
134 # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
135 # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
136 for segnum in range(self._num_segments):
137 if self._crypttext_hash_tree.needed_hashes(segnum):
138 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
142 d = self._readbucketproxy.get_crypttext_hashes()
143 d.addCallback(self._validate)
146 class ValidatedExtendedURIProxy:
147 implements(IValidatedThingProxy)
148 """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
149 retrieving and validating the elements from the UEB. """
151 def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
152 # fetch_failures is for debugging -- see test_encode.py
153 self._fetch_failures = fetch_failures
154 self._readbucketproxy = readbucketproxy
155 precondition(IVerifierURI.providedBy(verifycap), verifycap)
156 self._verifycap = verifycap
159 self.segment_size = None
160 self.crypttext_root_hash = None
161 self.share_root_hash = None
164 self.block_size = None
165 self.share_size = None
166 self.num_segments = None
167 self.tail_data_size = None
168 self.tail_segment_size = None
171 self.crypttext_hash = None
174 return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
176 def _check_integrity(self, data):
177 h = hashutil.uri_extension_hash(data)
178 if h != self._verifycap.uri_extension_hash:
179 msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
180 (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
181 if self._fetch_failures is not None:
182 self._fetch_failures["uri_extension"] += 1
183 raise BadURIExtensionHashValue(msg)
187 def _parse_and_validate(self, data):
188 self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
190 d = uri.unpack_extension(data)
192 # There are several kinds of things that can be found in a UEB. First, things that we
193 # really need to learn from the UEB in order to do this download. Next: things which are
194 # optional but not redundant -- if they are present in the UEB they will get used. Next,
195 # things that are optional and redundant. These things are required to be consistent:
196 # they don't have to be in the UEB, but if they are in the UEB then they will be checked
197 # for consistency with the already-known facts, and if they are inconsistent then an
198 # exception will be raised. These things aren't actually used -- they are just tested
199 # for consistency and ignored. Finally: things which are deprecated -- they ought not be
200 # in the UEB at all, and if they are present then a warning will be logged but they are
203 # First, things that we really need to learn from the UEB: segment_size,
204 # crypttext_root_hash, and share_root_hash.
205 self.segment_size = d['segment_size']
207 self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
208 self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
210 self.tail_data_size = self._verifycap.size % self.segment_size
211 if not self.tail_data_size:
212 self.tail_data_size = self.segment_size
213 # padding for erasure code
214 self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
216 # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
217 # matches this read-cap or verify-cap. The integrity check on the shares is not
218 # sufficient to prevent the original encoder from creating some shares of file A and
219 # other shares of file B.
220 self.crypttext_root_hash = d['crypttext_root_hash']
222 self.share_root_hash = d['share_root_hash']
225 # Next: things that are optional and not redundant: crypttext_hash
226 if d.has_key('crypttext_hash'):
227 self.crypttext_hash = d['crypttext_hash']
228 if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
229 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
232 # Next: things that are optional, redundant, and required to be consistent: codec_name,
233 # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
234 if d.has_key('codec_name'):
235 if d['codec_name'] != "crs":
236 raise UnsupportedErasureCodec(d['codec_name'])
238 if d.has_key('codec_params'):
239 ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
240 if ucpss != self.segment_size:
241 raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
242 "self.segment_size: %s" % (ucpss, self.segment_size))
243 if ucpns != self._verifycap.needed_shares:
244 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
245 "self._verifycap.needed_shares: %s" % (ucpns,
246 self._verifycap.needed_shares))
247 if ucpts != self._verifycap.total_shares:
248 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
249 "self._verifycap.total_shares: %s" % (ucpts,
250 self._verifycap.total_shares))
252 if d.has_key('tail_codec_params'):
253 utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
254 if utcpss != self.tail_segment_size:
255 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
256 "self.tail_segment_size: %s, self._verifycap.size: %s, "
257 "self.segment_size: %s, self._verifycap.needed_shares: %s"
258 % (utcpss, self.tail_segment_size, self._verifycap.size,
259 self.segment_size, self._verifycap.needed_shares))
260 if utcpns != self._verifycap.needed_shares:
261 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
262 "self._verifycap.needed_shares: %s" % (utcpns,
263 self._verifycap.needed_shares))
264 if utcpts != self._verifycap.total_shares:
265 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
266 "self._verifycap.total_shares: %s" % (utcpts,
267 self._verifycap.total_shares))
269 if d.has_key('num_segments'):
270 if d['num_segments'] != self.num_segments:
271 raise BadURIExtension("inconsistent num_segments: size: %s, "
272 "segment_size: %s, computed_num_segments: %s, "
273 "ueb_num_segments: %s" % (self._verifycap.size,
275 self.num_segments, d['num_segments']))
277 if d.has_key('size'):
278 if d['size'] != self._verifycap.size:
279 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
280 (self._verifycap.size, d['size']))
282 if d.has_key('needed_shares'):
283 if d['needed_shares'] != self._verifycap.needed_shares:
284 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
285 "needed shares: %s" % (self._verifycap.total_shares,
288 if d.has_key('total_shares'):
289 if d['total_shares'] != self._verifycap.total_shares:
290 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
291 "total shares: %s" % (self._verifycap.total_shares,
294 # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
295 if d.get('plaintext_hash'):
296 log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
297 "and is no longer used. Ignoring. %s" % (self,))
298 if d.get('plaintext_root_hash'):
299 log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
300 "reasons and is no longer used. Ignoring. %s" % (self,))
305 """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
306 it. Returns a deferred which is called back with self once the fetch is successful, or
307 is erred back if it fails. """
308 d = self._readbucketproxy.get_uri_extension()
309 d.addCallback(self._check_integrity)
310 d.addCallback(self._parse_and_validate)
313 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
314 """I am a front-end for a remote storage bucket, responsible for retrieving and validating
315 data from that bucket.
317 My get_block() method is used by BlockDownloaders.
320 def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
321 """ share_hash_tree is required to have already been initialized with the root hash
322 (the number-0 hash), using the share_root_hash from the UEB """
323 precondition(share_hash_tree[0] is not None, share_hash_tree)
324 prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
325 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
326 self.sharenum = sharenum
328 self.share_hash_tree = share_hash_tree
329 self.num_blocks = num_blocks
330 self.block_size = block_size
331 self.share_size = share_size
332 self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
334 def get_block(self, blocknum):
335 # the first time we use this bucket, we need to fetch enough elements
336 # of the share hash tree to validate it from our share hash up to the
338 if self.share_hash_tree.needed_hashes(self.sharenum):
339 d1 = self.bucket.get_share_hashes()
341 d1 = defer.succeed([])
343 # We might need to grab some elements of our block hash tree, to
344 # validate the requested block up to the share hash.
345 blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
346 # We don't need the root of the block hash tree, as that comes in the share tree.
347 blockhashesneeded.discard(0)
348 d2 = self.bucket.get_block_hashes(blockhashesneeded)
350 if blocknum < self.num_blocks-1:
351 thisblocksize = self.block_size
353 thisblocksize = self.share_size % self.block_size
354 if thisblocksize == 0:
355 thisblocksize = self.block_size
356 d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
358 dl = deferredutil.gatherResults([d1, d2, d3])
359 dl.addCallback(self._got_data, blocknum)
362 def _got_data(self, results, blocknum):
363 precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
364 sharehashes, blockhashes, blockdata = results
366 sharehashes = dict(sharehashes)
367 except ValueError, le:
368 le.args = tuple(le.args + (sharehashes,))
370 blockhashes = dict(enumerate(blockhashes))
372 candidate_share_hash = None # in case we log it in the except block below
373 blockhash = None # in case we log it in the except block below
376 if self.share_hash_tree.needed_hashes(self.sharenum):
377 # This will raise exception if the values being passed do not match the root
378 # node of self.share_hash_tree.
380 self.share_hash_tree.set_hashes(sharehashes)
381 except IndexError, le:
382 # Weird -- sharehashes contained index numbers outside of the range that fit
383 # into this hash tree.
384 raise BadOrMissingHash(le)
386 # To validate a block we need the root of the block hash tree, which is also one of
387 # the leafs of the share hash tree, and is called "the share hash".
388 if not self.block_hash_tree[0]: # empty -- no root node yet
389 # Get the share hash from the share hash tree.
390 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
392 raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
393 self.block_hash_tree.set_hashes({0: share_hash})
395 if self.block_hash_tree.needed_hashes(blocknum):
396 self.block_hash_tree.set_hashes(blockhashes)
398 blockhash = hashutil.block_hash(blockdata)
399 self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
400 #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
402 # (self.sharenum, blocknum, len(blockdata),
403 # blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
405 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
406 # log.WEIRD: indicates undetected disk/network error, or more
407 # likely a programming error
408 self.log("hash failure in block=%d, shnum=%d on %s" %
409 (blocknum, self.sharenum, self.bucket))
410 if self.block_hash_tree.needed_hashes(blocknum):
411 self.log(""" failure occurred when checking the block_hash_tree.
412 This suggests that either the block data was bad, or that the
413 block hashes we received along with it were bad.""")
415 self.log(""" the failure probably occurred when checking the
416 share_hash_tree, which suggests that the share hashes we
417 received from the remote peer were bad.""")
418 self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
419 self.log(" block length: %d" % len(blockdata))
420 self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
421 if len(blockdata) < 100:
422 self.log(" block data: %r" % (blockdata,))
424 self.log(" block data start/end: %r .. %r" %
425 (blockdata[:50], blockdata[-50:]))
426 self.log(" share hash tree:\n" + self.share_hash_tree.dump())
427 self.log(" block hash tree:\n" + self.block_hash_tree.dump())
429 for i,h in sorted(sharehashes.items()):
430 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
431 self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
433 for i,h in blockhashes.items():
434 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
435 log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
436 raise BadOrMissingHash(le)
438 # If we made it here, the block is good. If the hash trees didn't
439 # like what they saw, they would have raised a BadHashError, causing
440 # our caller to see a Failure and thus ignore this block (as well as
441 # dropping this bucket).
446 class BlockDownloader(log.PrefixingLogMixin):
447 """I am responsible for downloading a single block (from a single bucket)
448 for a single segment.
450 I am a child of the SegmentDownloader.
453 def __init__(self, vbucket, blocknum, parent, results):
454 precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
455 prefix = "%s-%d" % (vbucket, blocknum)
456 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
457 self.vbucket = vbucket
458 self.blocknum = blocknum
460 self.results = results
462 def start(self, segnum):
463 self.log("get_block(segnum=%d)" % segnum)
464 started = time.time()
465 d = self.vbucket.get_block(segnum)
466 d.addCallbacks(self._hold_block, self._got_block_error,
467 callbackArgs=(started,))
470 def _hold_block(self, data, started):
472 elapsed = time.time() - started
473 peerid = self.vbucket.bucket.get_peerid()
474 if peerid not in self.results.timings["fetch_per_server"]:
475 self.results.timings["fetch_per_server"][peerid] = []
476 self.results.timings["fetch_per_server"][peerid].append(elapsed)
477 self.log("got block")
478 self.parent.hold_block(self.blocknum, data)
480 def _got_block_error(self, f):
481 failtype = f.trap(RemoteException, DeadReferenceError,
482 IntegrityCheckReject,
483 layout.LayoutInvalid, layout.ShareVersionIncompatible)
484 if f.check(RemoteException, DeadReferenceError):
488 self.log("failure to get block", level=level, umid="5Z4uHQ")
490 peerid = self.vbucket.bucket.get_peerid()
491 self.results.server_problems[peerid] = str(f)
492 self.parent.bucket_failed(self.vbucket)
494 class SegmentDownloader:
495 """I am responsible for downloading all the blocks for a single segment
498 I am a child of the CiphertextDownloader.
501 def __init__(self, parent, segmentnumber, needed_shares, results):
503 self.segmentnumber = segmentnumber
504 self.needed_blocks = needed_shares
505 self.blocks = {} # k: blocknum, v: data
506 self.results = results
507 self._log_number = self.parent.log("starting segment %d" %
510 def log(self, *args, **kwargs):
511 if "parent" not in kwargs:
512 kwargs["parent"] = self._log_number
513 return self.parent.log(*args, **kwargs)
516 return self._download()
521 if len(self.blocks) >= self.needed_blocks:
522 # we only need self.needed_blocks blocks
523 # we want to get the smallest blockids, because they are
524 # more likely to be fast "primary blocks"
525 blockids = sorted(self.blocks.keys())[:self.needed_blocks]
527 for blocknum in blockids:
528 blocks.append(self.blocks[blocknum])
529 return (blocks, blockids)
531 return self._download()
536 # fill our set of active buckets, maybe raising NotEnoughSharesError
537 active_buckets = self.parent._activate_enough_buckets()
538 # Now we have enough buckets, in self.parent.active_buckets.
540 # in test cases, bd.start might mutate active_buckets right away, so
541 # we need to put off calling start() until we've iterated all the way
544 for blocknum, vbucket in active_buckets.iteritems():
545 assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
546 bd = BlockDownloader(vbucket, blocknum, self, self.results)
547 downloaders.append(bd)
549 self.results.servers_used.add(vbucket.bucket.get_peerid())
550 l = [bd.start(self.segmentnumber) for bd in downloaders]
551 return defer.DeferredList(l, fireOnOneErrback=True)
553 def hold_block(self, blocknum, data):
554 self.blocks[blocknum] = data
556 def bucket_failed(self, vbucket):
557 self.parent.bucket_failed(vbucket)
559 class DownloadStatus:
560 implements(IDownloadStatus)
561 statusid_counter = itertools.count(0)
564 self.storage_index = None
567 self.status = "Not started"
573 self.counter = self.statusid_counter.next()
574 self.started = time.time()
576 def get_started(self):
578 def get_storage_index(self):
579 return self.storage_index
582 def using_helper(self):
584 def get_status(self):
587 status += " (output paused)"
589 status += " (output stopped)"
591 def get_progress(self):
593 def get_active(self):
595 def get_results(self):
597 def get_counter(self):
600 def set_storage_index(self, si):
601 self.storage_index = si
602 def set_size(self, size):
604 def set_helper(self, helper):
606 def set_status(self, status):
608 def set_paused(self, paused):
610 def set_stopped(self, stopped):
611 self.stopped = stopped
612 def set_progress(self, value):
613 self.progress = value
614 def set_active(self, value):
616 def set_results(self, value):
619 class CiphertextDownloader(log.PrefixingLogMixin):
620 """ I download shares, check their integrity, then decode them, check the
621 integrity of the resulting ciphertext, then and write it to my target.
622 Before I send any new request to a server, I always ask the 'monitor'
623 object that was passed into my constructor whether this task has been
624 cancelled (by invoking its raise_if_cancelled() method)."""
625 implements(IPushProducer)
628 def __init__(self, storage_broker, v, target, monitor):
630 precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
631 precondition(IVerifierURI.providedBy(v), v)
632 precondition(IDownloadTarget.providedBy(target), target)
634 prefix=base32.b2a_l(v.storage_index[:8], 60)
635 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
636 self._storage_broker = storage_broker
639 self._storage_index = v.storage_index
640 self._uri_extension_hash = v.uri_extension_hash
642 self._started = time.time()
643 self._status = s = DownloadStatus()
644 s.set_status("Starting")
645 s.set_storage_index(self._storage_index)
646 s.set_size(self._verifycap.size)
650 self._results = DownloadResults()
651 s.set_results(self._results)
652 self._results.file_size = self._verifycap.size
653 self._results.timings["servers_peer_selection"] = {}
654 self._results.timings["fetch_per_server"] = {}
655 self._results.timings["cumulative_fetch"] = 0.0
656 self._results.timings["cumulative_decode"] = 0.0
657 self._results.timings["cumulative_decrypt"] = 0.0
658 self._results.timings["paused"] = 0.0
661 self._stopped = False
662 if IConsumer.providedBy(target):
663 target.registerProducer(self, True)
664 self._target = target
665 self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
666 self._monitor = monitor
669 self.active_buckets = {} # k: shnum, v: bucket
670 self._share_buckets = [] # list of (sharenum, bucket) tuples
671 self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
673 self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
675 self._ciphertext_hasher = hashutil.crypttext_hasher()
678 self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
680 # _got_uri_extension() will create the following:
681 # self._crypttext_hash_tree
682 # self._share_hash_tree
683 # self._current_segnum = 0
684 # self._vup # ValidatedExtendedURIProxy
686 def pauseProducing(self):
689 self._paused = defer.Deferred()
690 self._paused_at = time.time()
692 self._status.set_paused(True)
694 def resumeProducing(self):
696 paused_for = time.time() - self._paused_at
697 self._results.timings['paused'] += paused_for
700 eventually(p.callback, None)
702 self._status.set_paused(False)
704 def stopProducing(self):
705 self.log("Download.stopProducing")
707 self.resumeProducing()
709 self._status.set_stopped(True)
710 self._status.set_active(False)
713 self.log("starting download")
715 # first step: who should we download from?
716 d = defer.maybeDeferred(self._get_all_shareholders)
717 d.addCallback(self._got_all_shareholders)
718 # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
719 d.addCallback(self._obtain_uri_extension)
720 d.addCallback(self._get_crypttext_hash_tree)
721 # once we know that, we can download blocks from everybody
722 d.addCallback(self._download_all_segments)
725 self._status.set_status("Finished")
726 self._status.set_active(False)
727 self._status.set_paused(False)
728 if IConsumer.providedBy(self._target):
729 self._target.unregisterProducer()
734 self._status.set_status("Failed")
735 self._status.set_active(False)
736 if why.check(DownloadStopped):
737 # DownloadStopped just means the consumer aborted the download; not so scary.
738 self.log("download stopped", level=log.UNUSUAL)
740 # This is really unusual, and deserves maximum forensics.
741 self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
743 d.addErrback(_failed)
744 d.addCallback(self._done)
747 def _get_all_shareholders(self):
749 sb = self._storage_broker
750 servers = sb.get_servers_for_index(self._storage_index)
752 raise NoServersError("broker gave us no servers!")
753 for (peerid,ss) in servers:
754 self.log(format="sending DYHB to [%(peerid)s]",
755 peerid=idlib.shortnodeid_b2a(peerid),
756 level=log.NOISY, umid="rT03hg")
757 d = ss.callRemote("get_buckets", self._storage_index)
758 d.addCallbacks(self._got_response, self._got_error,
759 callbackArgs=(peerid,))
761 self._responses_received = 0
762 self._queries_sent = len(dl)
764 self._status.set_status("Locating Shares (%d/%d)" %
765 (self._responses_received,
767 return defer.DeferredList(dl)
769 def _got_response(self, buckets, peerid):
770 self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
771 peerid=idlib.shortnodeid_b2a(peerid),
772 shnums=sorted(buckets.keys()),
773 level=log.NOISY, umid="o4uwFg")
774 self._responses_received += 1
776 elapsed = time.time() - self._started
777 self._results.timings["servers_peer_selection"][peerid] = elapsed
779 self._status.set_status("Locating Shares (%d/%d)" %
780 (self._responses_received,
782 for sharenum, bucket in buckets.iteritems():
783 b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
784 self.add_share_bucket(sharenum, b)
787 if peerid not in self._results.servermap:
788 self._results.servermap[peerid] = set()
789 self._results.servermap[peerid].add(sharenum)
791 def add_share_bucket(self, sharenum, bucket):
792 # this is split out for the benefit of test_encode.py
793 self._share_buckets.append( (sharenum, bucket) )
795 def _got_error(self, f):
797 if f.check(DeadReferenceError):
799 self.log("Error during get_buckets", failure=f, level=level,
802 def bucket_failed(self, vbucket):
803 shnum = vbucket.sharenum
804 del self.active_buckets[shnum]
805 s = self._share_vbuckets[shnum]
806 # s is a set of ValidatedReadBucketProxy instances
808 # ... which might now be empty
810 # there are no more buckets which can provide this share, so
811 # remove the key. This may prompt us to use a different share.
812 del self._share_vbuckets[shnum]
814 def _got_all_shareholders(self, res):
817 self._results.timings["peer_selection"] = now - self._started
819 if len(self._share_buckets) < self._verifycap.needed_shares:
820 msg = "Failed to get enough shareholders: have %d, need %d" \
821 % (len(self._share_buckets), self._verifycap.needed_shares)
822 if self._share_buckets:
823 raise NotEnoughSharesError(msg)
825 raise NoSharesError(msg)
827 #for s in self._share_vbuckets.values():
829 # assert isinstance(vb, ValidatedReadBucketProxy), \
830 # "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
832 def _obtain_uri_extension(self, ignored):
833 # all shareholders are supposed to have a copy of uri_extension, and
834 # all are supposed to be identical. We compute the hash of the data
835 # that comes back, and compare it against the version in our URI. If
836 # they don't match, ignore their data and try someone else.
838 self._status.set_status("Obtaining URI Extension")
840 uri_extension_fetch_started = time.time()
843 for sharenum, bucket in self._share_buckets:
844 vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
845 vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
848 def _got_uri_extension(vup):
849 precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
851 elapsed = time.time() - uri_extension_fetch_started
852 self._results.timings["uri_extension"] = elapsed
855 self._codec = codec.CRSDecoder()
856 self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
857 self._tail_codec = codec.CRSDecoder()
858 self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
860 self._current_segnum = 0
862 self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
863 self._share_hash_tree.set_hashes({0: vup.share_root_hash})
865 self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
866 self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
868 # Repairer (uploader) needs the encodingparams.
869 self._target.set_encodingparams((
870 self._verifycap.needed_shares,
871 self._verifycap.total_shares, # I don't think the target actually cares about "happy".
872 self._verifycap.total_shares,
873 self._vup.segment_size
875 d.addCallback(_got_uri_extension)
878 def _get_crypttext_hash_tree(self, res):
880 for sharenum, bucket in self._share_buckets:
881 vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
884 _get_crypttext_hash_tree_started = time.time()
886 self._status.set_status("Retrieving crypttext hash tree")
888 vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
891 def _got_crypttext_hash_tree(res):
892 # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
895 elapsed = time.time() - _get_crypttext_hash_tree_started
896 self._results.timings["hashtrees"] = elapsed
897 d.addCallback(_got_crypttext_hash_tree)
900 def _activate_enough_buckets(self):
901 """either return a mapping from shnum to a ValidatedReadBucketProxy that can
902 provide data for that share, or raise NotEnoughSharesError"""
904 while len(self.active_buckets) < self._verifycap.needed_shares:
906 handled_shnums = set(self.active_buckets.keys())
907 available_shnums = set(self._share_vbuckets.keys())
908 potential_shnums = list(available_shnums - handled_shnums)
909 if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
910 have = len(potential_shnums) + len(self.active_buckets)
911 msg = "Unable to activate enough shares: have %d, need %d" \
912 % (have, self._verifycap.needed_shares)
914 raise NotEnoughSharesError(msg)
916 raise NoSharesError(msg)
917 # For the next share, choose a primary share if available, else a randomly chosen
919 potential_shnums.sort()
920 if potential_shnums[0] < self._verifycap.needed_shares:
921 shnum = potential_shnums[0]
923 shnum = random.choice(potential_shnums)
924 # and a random bucket that will provide it
925 validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
926 self.active_buckets[shnum] = validated_bucket
927 return self.active_buckets
930 def _download_all_segments(self, res):
931 for sharenum, bucket in self._share_buckets:
932 vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
933 self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
935 # after the above code, self._share_vbuckets contains enough
936 # buckets to complete the download, and some extra ones to
937 # tolerate some buckets dropping out or having
938 # errors. self._share_vbuckets is a dictionary that maps from
939 # shnum to a set of ValidatedBuckets, which themselves are
940 # wrappers around RIBucketReader references.
941 self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
943 self._started_fetching = time.time()
945 d = defer.succeed(None)
946 for segnum in range(self._vup.num_segments):
947 d.addCallback(self._download_segment, segnum)
948 # this pause, at the end of write, prevents pre-fetch from
949 # happening until the consumer is ready for more data.
950 d.addCallback(self._check_for_pause)
953 def _check_for_pause(self, res):
956 self._paused.addCallback(lambda ignored: d.callback(res))
959 raise DownloadStopped("our Consumer called stopProducing()")
960 self._monitor.raise_if_cancelled()
963 def _download_segment(self, res, segnum):
965 self._status.set_status("Downloading segment %d of %d" %
966 (segnum+1, self._vup.num_segments))
967 self.log("downloading seg#%d of %d (%d%%)"
968 % (segnum, self._vup.num_segments,
969 100.0 * segnum / self._vup.num_segments))
970 # memory footprint: when the SegmentDownloader finishes pulling down
971 # all shares, we have 1*segment_size of usage.
972 segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
974 started = time.time()
975 d = segmentdler.start()
976 def _finished_fetching(res):
977 elapsed = time.time() - started
978 self._results.timings["cumulative_fetch"] += elapsed
981 d.addCallback(_finished_fetching)
982 # pause before using more memory
983 d.addCallback(self._check_for_pause)
984 # while the codec does its job, we hit 2*segment_size
985 def _started_decode(res):
986 self._started_decode = time.time()
989 d.addCallback(_started_decode)
990 if segnum + 1 == self._vup.num_segments:
991 codec = self._tail_codec
994 d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
995 # once the codec is done, we drop back to 1*segment_size, because
996 # 'shares' goes out of scope. The memory usage is all in the
997 # plaintext now, spread out into a bunch of tiny buffers.
998 def _finished_decode(res):
999 elapsed = time.time() - self._started_decode
1000 self._results.timings["cumulative_decode"] += elapsed
1003 d.addCallback(_finished_decode)
1005 # pause/check-for-stop just before writing, to honor stopProducing
1006 d.addCallback(self._check_for_pause)
1007 d.addCallback(self._got_segment)
1010 def _got_segment(self, buffers):
1011 precondition(self._crypttext_hash_tree)
1012 started_decrypt = time.time()
1013 self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
1015 if self._current_segnum + 1 == self._vup.num_segments:
1016 # This is the last segment.
1017 # Trim off any padding added by the upload side. We never send empty segments. If
1018 # the data was an exact multiple of the segment size, the last segment will be full.
1019 tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1020 num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1021 # Remove buffers which don't contain any part of the tail.
1022 del buffers[num_buffers_used:]
1023 # Remove the past-the-tail-part of the last buffer.
1024 tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1025 if tail_in_last_buf == 0:
1026 tail_in_last_buf = tail_buf_size
1027 buffers[-1] = buffers[-1][:tail_in_last_buf]
1029 # First compute the hash of this segment and check that it fits.
1030 ch = hashutil.crypttext_segment_hasher()
1031 for buffer in buffers:
1032 self._ciphertext_hasher.update(buffer)
1034 self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1036 # Then write this segment to the target.
1037 if not self._opened:
1039 self._target.open(self._verifycap.size)
1041 for buffer in buffers:
1042 self._target.write(buffer)
1043 self._bytes_done += len(buffer)
1045 self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1046 self._current_segnum += 1
1049 elapsed = time.time() - started_decrypt
1050 self._results.timings["cumulative_decrypt"] += elapsed
1052 def _done(self, res):
1053 self.log("download done")
1056 self._results.timings["total"] = now - self._started
1057 self._results.timings["segments"] = now - self._started_fetching
1058 if self._vup.crypttext_hash:
1059 _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1060 "bad crypttext_hash: computed=%s, expected=%s" %
1061 (base32.b2a(self._ciphertext_hasher.digest()),
1062 base32.b2a(self._vup.crypttext_hash)))
1063 _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1064 self._status.set_progress(1)
1065 self._target.close()
1066 return self._target.finish()
1067 def get_download_status(self):
1072 implements(IDownloadTarget)
1073 def __init__(self, filename):
1074 self._filename = filename
1076 def open(self, size):
1077 self.f = open(self._filename, "wb")
1079 def write(self, data):
1084 def fail(self, why):
1087 os.unlink(self._filename)
1088 def register_canceller(self, cb):
1089 pass # we won't use it
1092 # The following methods are just because the target might be a repairer.DownUpConnector,
1093 # and just because the current CHKUpload object expects to find the storage index and
1094 # encoding parameters in its Uploadable.
1095 def set_storageindex(self, storageindex):
1097 def set_encodingparams(self, encodingparams):
1101 implements(IDownloadTarget)
1104 def open(self, size):
1106 def write(self, data):
1107 self._data.append(data)
1109 self.data = "".join(self._data)
1111 def fail(self, why):
1113 def register_canceller(self, cb):
1114 pass # we won't use it
1117 # The following methods are just because the target might be a repairer.DownUpConnector,
1118 # and just because the current CHKUpload object expects to find the storage index and
1119 # encoding parameters in its Uploadable.
1120 def set_storageindex(self, storageindex):
1122 def set_encodingparams(self, encodingparams):
1126 """Use me to download data to a pre-defined filehandle-like object. I
1127 will use the target's write() method. I will *not* close the filehandle:
1128 I leave that up to the originator of the filehandle. The download process
1129 will return the filehandle when it completes.
1131 implements(IDownloadTarget)
1132 def __init__(self, filehandle):
1133 self._filehandle = filehandle
1134 def open(self, size):
1136 def write(self, data):
1137 self._filehandle.write(data)
1139 # the originator of the filehandle reserves the right to close it
1141 def fail(self, why):
1143 def register_canceller(self, cb):
1146 return self._filehandle
1147 # The following methods are just because the target might be a repairer.DownUpConnector,
1148 # and just because the current CHKUpload object expects to find the storage index and
1149 # encoding parameters in its Uploadable.
1150 def set_storageindex(self, storageindex):
1152 def set_encodingparams(self, encodingparams):
1155 class ConsumerAdapter:
1156 implements(IDownloadTarget, IConsumer)
1157 def __init__(self, consumer):
1158 self._consumer = consumer
1160 def registerProducer(self, producer, streaming):
1161 self._consumer.registerProducer(producer, streaming)
1162 def unregisterProducer(self):
1163 self._consumer.unregisterProducer()
1165 def open(self, size):
1167 def write(self, data):
1168 self._consumer.write(data)
1172 def fail(self, why):
1174 def register_canceller(self, cb):
1177 return self._consumer
1178 # The following methods are just because the target might be a repairer.DownUpConnector,
1179 # and just because the current CHKUpload object expects to find the storage index and
1180 # encoding parameters in its Uploadable.
1181 def set_storageindex(self, storageindex):
1183 def set_encodingparams(self, encodingparams):
1188 """I am a service that allows file downloading.
1190 # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1191 # It is scheduled to go away, to be replaced by filenode.download()
1192 implements(IDownloader)
1194 def __init__(self, storage_broker, stats_provider):
1195 self.storage_broker = storage_broker
1196 self.stats_provider = stats_provider
1197 self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1199 def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1201 t = IDownloadTarget(t)
1205 assert isinstance(u, uri.CHKFileURI)
1206 if self.stats_provider:
1207 # these counters are meant for network traffic, and don't
1209 self.stats_provider.count('downloader.files_downloaded', 1)
1210 self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1212 target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1215 dl = CiphertextDownloader(self.storage_broker,
1216 u.get_verify_cap(), target,
1218 self._all_downloads[dl] = None
1220 history.add_download(dl.get_download_status())
1225 def download_to_data(self, uri, _log_msg_id=None, history=None):
1226 return self.download(uri, Data(), _log_msg_id=_log_msg_id, history=history)
1227 def download_to_filename(self, uri, filename, _log_msg_id=None):
1228 return self.download(uri, FileName(filename), _log_msg_id=_log_msg_id)
1229 def download_to_filehandle(self, uri, filehandle, _log_msg_id=None):
1230 return self.download(uri, FileHandle(filehandle), _log_msg_id=_log_msg_id)