1 import os, random, weakref, itertools, time
2 from zope.interface import implements
3 from twisted.internet import defer
4 from twisted.internet.interfaces import IPushProducer, IConsumer
5 from twisted.application import service
6 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
9 from allmydata.util.assertutil import _assert, precondition
10 from allmydata import codec, hashtree, uri
11 from allmydata.interfaces import IDownloadTarget, IDownloader, \
12 IFileURI, IVerifierURI, \
13 IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
14 IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
15 UnableToFetchCriticalDownloadDataError
16 from allmydata.immutable import layout
17 from allmydata.monitor import Monitor
18 from pycryptopp.cipher.aes import AES
20 class IntegrityCheckReject(Exception):
23 class BadURIExtensionHashValue(IntegrityCheckReject):
25 class BadURIExtension(IntegrityCheckReject):
27 class UnsupportedErasureCodec(BadURIExtension):
29 class BadCrypttextHashValue(IntegrityCheckReject):
31 class BadOrMissingHash(IntegrityCheckReject):
34 class DownloadStopped(Exception):
37 class DownloadResults:
38 implements(IDownloadResults)
41 self.servers_used = set()
42 self.server_problems = {}
47 class DecryptingTarget(log.PrefixingLogMixin):
48 implements(IDownloadTarget, IConsumer)
49 def __init__(self, target, key, _log_msg_id=None):
50 precondition(IDownloadTarget.providedBy(target), target)
52 self._decryptor = AES(key)
54 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
55 # methods to satisfy the IConsumer interface
56 def registerProducer(self, producer, streaming):
57 if IConsumer.providedBy(self.target):
58 self.target.registerProducer(producer, streaming)
59 def unregisterProducer(self):
60 if IConsumer.providedBy(self.target):
61 self.target.unregisterProducer()
62 def write(self, ciphertext):
63 plaintext = self._decryptor.process(ciphertext)
64 self.target.write(plaintext)
66 self.target.open(size)
70 return self.target.finish()
71 # The following methods is just to pass through to the next target, and just because that
72 # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
73 # expects to find the storage index in its Uploadable.
74 def set_storageindex(self, storageindex):
75 self.target.set_storageindex(storageindex)
76 def set_encodingparams(self, encodingparams):
77 self.target.set_encodingparams(encodingparams)
79 class ValidatedThingObtainer:
80 def __init__(self, validatedthingproxies, debugname, log_id):
81 self._validatedthingproxies = validatedthingproxies
82 self._debugname = debugname
85 def _bad(self, f, validatedthingproxy):
86 failtype = f.trap(RemoteException, DeadReferenceError,
87 IntegrityCheckReject, layout.LayoutInvalid,
88 layout.ShareVersionIncompatible)
90 if f.check(DeadReferenceError):
92 elif f.check(RemoteException):
96 log.msg(parent=self._log_id, facility="tahoe.immutable.download",
97 format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
98 op=self._debugname, validatedthingproxy=str(validatedthingproxy),
99 failure=f, level=level, umid="JGXxBA")
100 if not self._validatedthingproxies:
101 raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
102 # try again with a different one
103 d = self._try_the_next_one()
106 def _try_the_next_one(self):
107 vtp = self._validatedthingproxies.pop(0)
108 d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
109 d.addErrback(self._bad, vtp)
113 return self._try_the_next_one()
115 class ValidatedCrypttextHashTreeProxy:
116 implements(IValidatedThingProxy)
117 """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
118 its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
119 start() method that fires with self once I get a valid one). """
120 def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
121 # fetch_failures is for debugging -- see test_encode.py
122 self._readbucketproxy = readbucketproxy
123 self._num_segments = num_segments
124 self._fetch_failures = fetch_failures
125 self._crypttext_hash_tree = crypttext_hash_tree
127 def _validate(self, proposal):
128 ct_hashes = dict(list(enumerate(proposal)))
130 self._crypttext_hash_tree.set_hashes(ct_hashes)
131 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
132 if self._fetch_failures is not None:
133 self._fetch_failures["crypttext_hash_tree"] += 1
134 raise BadOrMissingHash(le)
135 # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
136 # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
137 for segnum in range(self._num_segments):
138 if self._crypttext_hash_tree.needed_hashes(segnum):
139 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
143 d = self._readbucketproxy.get_crypttext_hashes()
144 d.addCallback(self._validate)
147 class ValidatedExtendedURIProxy:
148 implements(IValidatedThingProxy)
149 """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
150 retrieving and validating the elements from the UEB. """
152 def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
153 # fetch_failures is for debugging -- see test_encode.py
154 self._fetch_failures = fetch_failures
155 self._readbucketproxy = readbucketproxy
156 precondition(IVerifierURI.providedBy(verifycap), verifycap)
157 self._verifycap = verifycap
160 self.segment_size = None
161 self.crypttext_root_hash = None
162 self.share_root_hash = None
165 self.block_size = None
166 self.share_size = None
167 self.num_segments = None
168 self.tail_data_size = None
169 self.tail_segment_size = None
172 self.crypttext_hash = None
175 return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
177 def _check_integrity(self, data):
178 h = hashutil.uri_extension_hash(data)
179 if h != self._verifycap.uri_extension_hash:
180 msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
181 (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
182 if self._fetch_failures is not None:
183 self._fetch_failures["uri_extension"] += 1
184 raise BadURIExtensionHashValue(msg)
188 def _parse_and_validate(self, data):
189 self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
191 d = uri.unpack_extension(data)
193 # There are several kinds of things that can be found in a UEB. First, things that we
194 # really need to learn from the UEB in order to do this download. Next: things which are
195 # optional but not redundant -- if they are present in the UEB they will get used. Next,
196 # things that are optional and redundant. These things are required to be consistent:
197 # they don't have to be in the UEB, but if they are in the UEB then they will be checked
198 # for consistency with the already-known facts, and if they are inconsistent then an
199 # exception will be raised. These things aren't actually used -- they are just tested
200 # for consistency and ignored. Finally: things which are deprecated -- they ought not be
201 # in the UEB at all, and if they are present then a warning will be logged but they are
204 # First, things that we really need to learn from the UEB: segment_size,
205 # crypttext_root_hash, and share_root_hash.
206 self.segment_size = d['segment_size']
208 self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
209 self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
211 self.tail_data_size = self._verifycap.size % self.segment_size
212 if not self.tail_data_size:
213 self.tail_data_size = self.segment_size
214 # padding for erasure code
215 self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
217 # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
218 # matches this read-cap or verify-cap. The integrity check on the shares is not
219 # sufficient to prevent the original encoder from creating some shares of file A and
220 # other shares of file B.
221 self.crypttext_root_hash = d['crypttext_root_hash']
223 self.share_root_hash = d['share_root_hash']
226 # Next: things that are optional and not redundant: crypttext_hash
227 if d.has_key('crypttext_hash'):
228 self.crypttext_hash = d['crypttext_hash']
229 if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
230 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
233 # Next: things that are optional, redundant, and required to be consistent: codec_name,
234 # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
235 if d.has_key('codec_name'):
236 if d['codec_name'] != "crs":
237 raise UnsupportedErasureCodec(d['codec_name'])
239 if d.has_key('codec_params'):
240 ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
241 if ucpss != self.segment_size:
242 raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
243 "self.segment_size: %s" % (ucpss, self.segment_size))
244 if ucpns != self._verifycap.needed_shares:
245 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
246 "self._verifycap.needed_shares: %s" % (ucpns,
247 self._verifycap.needed_shares))
248 if ucpts != self._verifycap.total_shares:
249 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
250 "self._verifycap.total_shares: %s" % (ucpts,
251 self._verifycap.total_shares))
253 if d.has_key('tail_codec_params'):
254 utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
255 if utcpss != self.tail_segment_size:
256 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
257 "self.tail_segment_size: %s, self._verifycap.size: %s, "
258 "self.segment_size: %s, self._verifycap.needed_shares: %s"
259 % (utcpss, self.tail_segment_size, self._verifycap.size,
260 self.segment_size, self._verifycap.needed_shares))
261 if utcpns != self._verifycap.needed_shares:
262 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
263 "self._verifycap.needed_shares: %s" % (utcpns,
264 self._verifycap.needed_shares))
265 if utcpts != self._verifycap.total_shares:
266 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
267 "self._verifycap.total_shares: %s" % (utcpts,
268 self._verifycap.total_shares))
270 if d.has_key('num_segments'):
271 if d['num_segments'] != self.num_segments:
272 raise BadURIExtension("inconsistent num_segments: size: %s, "
273 "segment_size: %s, computed_num_segments: %s, "
274 "ueb_num_segments: %s" % (self._verifycap.size,
276 self.num_segments, d['num_segments']))
278 if d.has_key('size'):
279 if d['size'] != self._verifycap.size:
280 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
281 (self._verifycap.size, d['size']))
283 if d.has_key('needed_shares'):
284 if d['needed_shares'] != self._verifycap.needed_shares:
285 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
286 "needed shares: %s" % (self._verifycap.total_shares,
289 if d.has_key('total_shares'):
290 if d['total_shares'] != self._verifycap.total_shares:
291 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
292 "total shares: %s" % (self._verifycap.total_shares,
295 # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
296 if d.get('plaintext_hash'):
297 log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
298 "and is no longer used. Ignoring. %s" % (self,))
299 if d.get('plaintext_root_hash'):
300 log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
301 "reasons and is no longer used. Ignoring. %s" % (self,))
306 """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
307 it. Returns a deferred which is called back with self once the fetch is successful, or
308 is erred back if it fails. """
309 d = self._readbucketproxy.get_uri_extension()
310 d.addCallback(self._check_integrity)
311 d.addCallback(self._parse_and_validate)
314 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
315 """I am a front-end for a remote storage bucket, responsible for retrieving and validating
316 data from that bucket.
318 My get_block() method is used by BlockDownloaders.
321 def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
322 """ share_hash_tree is required to have already been initialized with the root hash
323 (the number-0 hash), using the share_root_hash from the UEB """
324 precondition(share_hash_tree[0] is not None, share_hash_tree)
325 prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
326 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
327 self.sharenum = sharenum
329 self.share_hash_tree = share_hash_tree
330 self.num_blocks = num_blocks
331 self.block_size = block_size
332 self.share_size = share_size
333 self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
335 def get_block(self, blocknum):
336 # the first time we use this bucket, we need to fetch enough elements
337 # of the share hash tree to validate it from our share hash up to the
339 if self.share_hash_tree.needed_hashes(self.sharenum):
340 d1 = self.bucket.get_share_hashes()
342 d1 = defer.succeed([])
344 # We might need to grab some elements of our block hash tree, to
345 # validate the requested block up to the share hash.
346 blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
347 # We don't need the root of the block hash tree, as that comes in the share tree.
348 blockhashesneeded.discard(0)
349 d2 = self.bucket.get_block_hashes(blockhashesneeded)
351 if blocknum < self.num_blocks-1:
352 thisblocksize = self.block_size
354 thisblocksize = self.share_size % self.block_size
355 if thisblocksize == 0:
356 thisblocksize = self.block_size
357 d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
359 dl = deferredutil.gatherResults([d1, d2, d3])
360 dl.addCallback(self._got_data, blocknum)
363 def _got_data(self, results, blocknum):
364 precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
365 sharehashes, blockhashes, blockdata = results
367 sharehashes = dict(sharehashes)
368 except ValueError, le:
369 le.args = tuple(le.args + (sharehashes,))
371 blockhashes = dict(enumerate(blockhashes))
373 candidate_share_hash = None # in case we log it in the except block below
374 blockhash = None # in case we log it in the except block below
377 if self.share_hash_tree.needed_hashes(self.sharenum):
378 # This will raise exception if the values being passed do not match the root
379 # node of self.share_hash_tree.
381 self.share_hash_tree.set_hashes(sharehashes)
382 except IndexError, le:
383 # Weird -- sharehashes contained index numbers outside of the range that fit
384 # into this hash tree.
385 raise BadOrMissingHash(le)
387 # To validate a block we need the root of the block hash tree, which is also one of
388 # the leafs of the share hash tree, and is called "the share hash".
389 if not self.block_hash_tree[0]: # empty -- no root node yet
390 # Get the share hash from the share hash tree.
391 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
393 raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
394 self.block_hash_tree.set_hashes({0: share_hash})
396 if self.block_hash_tree.needed_hashes(blocknum):
397 self.block_hash_tree.set_hashes(blockhashes)
399 blockhash = hashutil.block_hash(blockdata)
400 self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
401 #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
403 # (self.sharenum, blocknum, len(blockdata),
404 # blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
406 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
407 # log.WEIRD: indicates undetected disk/network error, or more
408 # likely a programming error
409 self.log("hash failure in block=%d, shnum=%d on %s" %
410 (blocknum, self.sharenum, self.bucket))
411 if self.block_hash_tree.needed_hashes(blocknum):
412 self.log(""" failure occurred when checking the block_hash_tree.
413 This suggests that either the block data was bad, or that the
414 block hashes we received along with it were bad.""")
416 self.log(""" the failure probably occurred when checking the
417 share_hash_tree, which suggests that the share hashes we
418 received from the remote peer were bad.""")
419 self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
420 self.log(" block length: %d" % len(blockdata))
421 self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
422 if len(blockdata) < 100:
423 self.log(" block data: %r" % (blockdata,))
425 self.log(" block data start/end: %r .. %r" %
426 (blockdata[:50], blockdata[-50:]))
427 self.log(" share hash tree:\n" + self.share_hash_tree.dump())
428 self.log(" block hash tree:\n" + self.block_hash_tree.dump())
430 for i,h in sorted(sharehashes.items()):
431 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
432 self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
434 for i,h in blockhashes.items():
435 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
436 log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
437 raise BadOrMissingHash(le)
439 # If we made it here, the block is good. If the hash trees didn't
440 # like what they saw, they would have raised a BadHashError, causing
441 # our caller to see a Failure and thus ignore this block (as well as
442 # dropping this bucket).
447 class BlockDownloader(log.PrefixingLogMixin):
448 """I am responsible for downloading a single block (from a single bucket)
449 for a single segment.
451 I am a child of the SegmentDownloader.
454 def __init__(self, vbucket, blocknum, parent, results):
455 precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
456 prefix = "%s-%d" % (vbucket, blocknum)
457 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
458 self.vbucket = vbucket
459 self.blocknum = blocknum
461 self.results = results
463 def start(self, segnum):
464 self.log("get_block(segnum=%d)" % segnum)
465 started = time.time()
466 d = self.vbucket.get_block(segnum)
467 d.addCallbacks(self._hold_block, self._got_block_error,
468 callbackArgs=(started,))
471 def _hold_block(self, data, started):
473 elapsed = time.time() - started
474 peerid = self.vbucket.bucket.get_peerid()
475 if peerid not in self.results.timings["fetch_per_server"]:
476 self.results.timings["fetch_per_server"][peerid] = []
477 self.results.timings["fetch_per_server"][peerid].append(elapsed)
478 self.log("got block")
479 self.parent.hold_block(self.blocknum, data)
481 def _got_block_error(self, f):
482 failtype = f.trap(RemoteException, DeadReferenceError,
483 IntegrityCheckReject,
484 layout.LayoutInvalid, layout.ShareVersionIncompatible)
485 if f.check(RemoteException, DeadReferenceError):
489 self.log("failure to get block", level=level, umid="5Z4uHQ")
491 peerid = self.vbucket.bucket.get_peerid()
492 self.results.server_problems[peerid] = str(f)
493 self.parent.bucket_failed(self.vbucket)
495 class SegmentDownloader:
496 """I am responsible for downloading all the blocks for a single segment
499 I am a child of the CiphertextDownloader.
502 def __init__(self, parent, segmentnumber, needed_shares, results):
504 self.segmentnumber = segmentnumber
505 self.needed_blocks = needed_shares
506 self.blocks = {} # k: blocknum, v: data
507 self.results = results
508 self._log_number = self.parent.log("starting segment %d" %
511 def log(self, *args, **kwargs):
512 if "parent" not in kwargs:
513 kwargs["parent"] = self._log_number
514 return self.parent.log(*args, **kwargs)
517 return self._download()
522 if len(self.blocks) >= self.needed_blocks:
523 # we only need self.needed_blocks blocks
524 # we want to get the smallest blockids, because they are
525 # more likely to be fast "primary blocks"
526 blockids = sorted(self.blocks.keys())[:self.needed_blocks]
528 for blocknum in blockids:
529 blocks.append(self.blocks[blocknum])
530 return (blocks, blockids)
532 return self._download()
537 # fill our set of active buckets, maybe raising NotEnoughSharesError
538 active_buckets = self.parent._activate_enough_buckets()
539 # Now we have enough buckets, in self.parent.active_buckets.
541 # in test cases, bd.start might mutate active_buckets right away, so
542 # we need to put off calling start() until we've iterated all the way
545 for blocknum, vbucket in active_buckets.iteritems():
546 assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
547 bd = BlockDownloader(vbucket, blocknum, self, self.results)
548 downloaders.append(bd)
550 self.results.servers_used.add(vbucket.bucket.get_peerid())
551 l = [bd.start(self.segmentnumber) for bd in downloaders]
552 return defer.DeferredList(l, fireOnOneErrback=True)
554 def hold_block(self, blocknum, data):
555 self.blocks[blocknum] = data
557 def bucket_failed(self, vbucket):
558 self.parent.bucket_failed(vbucket)
560 class DownloadStatus:
561 implements(IDownloadStatus)
562 statusid_counter = itertools.count(0)
565 self.storage_index = None
568 self.status = "Not started"
574 self.counter = self.statusid_counter.next()
575 self.started = time.time()
577 def get_started(self):
579 def get_storage_index(self):
580 return self.storage_index
583 def using_helper(self):
585 def get_status(self):
588 status += " (output paused)"
590 status += " (output stopped)"
592 def get_progress(self):
594 def get_active(self):
596 def get_results(self):
598 def get_counter(self):
601 def set_storage_index(self, si):
602 self.storage_index = si
603 def set_size(self, size):
605 def set_helper(self, helper):
607 def set_status(self, status):
609 def set_paused(self, paused):
611 def set_stopped(self, stopped):
612 self.stopped = stopped
613 def set_progress(self, value):
614 self.progress = value
615 def set_active(self, value):
617 def set_results(self, value):
620 class CiphertextDownloader(log.PrefixingLogMixin):
621 """ I download shares, check their integrity, then decode them, check the
622 integrity of the resulting ciphertext, then and write it to my target.
623 Before I send any new request to a server, I always ask the 'monitor'
624 object that was passed into my constructor whether this task has been
625 cancelled (by invoking its raise_if_cancelled() method)."""
626 implements(IPushProducer)
629 def __init__(self, storage_broker, v, target, monitor):
631 precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
632 precondition(IVerifierURI.providedBy(v), v)
633 precondition(IDownloadTarget.providedBy(target), target)
635 prefix=base32.b2a_l(v.storage_index[:8], 60)
636 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
637 self._storage_broker = storage_broker
640 self._storage_index = v.storage_index
641 self._uri_extension_hash = v.uri_extension_hash
643 self._started = time.time()
644 self._status = s = DownloadStatus()
645 s.set_status("Starting")
646 s.set_storage_index(self._storage_index)
647 s.set_size(self._verifycap.size)
651 self._results = DownloadResults()
652 s.set_results(self._results)
653 self._results.file_size = self._verifycap.size
654 self._results.timings["servers_peer_selection"] = {}
655 self._results.timings["fetch_per_server"] = {}
656 self._results.timings["cumulative_fetch"] = 0.0
657 self._results.timings["cumulative_decode"] = 0.0
658 self._results.timings["cumulative_decrypt"] = 0.0
659 self._results.timings["paused"] = 0.0
662 self._stopped = False
663 if IConsumer.providedBy(target):
664 target.registerProducer(self, True)
665 self._target = target
666 self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
667 self._monitor = monitor
670 self.active_buckets = {} # k: shnum, v: bucket
671 self._share_buckets = [] # list of (sharenum, bucket) tuples
672 self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
674 self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
676 self._ciphertext_hasher = hashutil.crypttext_hasher()
679 self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
681 # _got_uri_extension() will create the following:
682 # self._crypttext_hash_tree
683 # self._share_hash_tree
684 # self._current_segnum = 0
685 # self._vup # ValidatedExtendedURIProxy
687 def pauseProducing(self):
690 self._paused = defer.Deferred()
691 self._paused_at = time.time()
693 self._status.set_paused(True)
695 def resumeProducing(self):
697 paused_for = time.time() - self._paused_at
698 self._results.timings['paused'] += paused_for
701 eventually(p.callback, None)
703 self._status.set_paused(False)
705 def stopProducing(self):
706 self.log("Download.stopProducing")
708 self.resumeProducing()
710 self._status.set_stopped(True)
711 self._status.set_active(False)
714 self.log("starting download")
716 # first step: who should we download from?
717 d = defer.maybeDeferred(self._get_all_shareholders)
718 d.addCallback(self._got_all_shareholders)
719 # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
720 d.addCallback(self._obtain_uri_extension)
721 d.addCallback(self._get_crypttext_hash_tree)
722 # once we know that, we can download blocks from everybody
723 d.addCallback(self._download_all_segments)
726 self._status.set_status("Finished")
727 self._status.set_active(False)
728 self._status.set_paused(False)
729 if IConsumer.providedBy(self._target):
730 self._target.unregisterProducer()
735 self._status.set_status("Failed")
736 self._status.set_active(False)
737 if why.check(DownloadStopped):
738 # DownloadStopped just means the consumer aborted the download; not so scary.
739 self.log("download stopped", level=log.UNUSUAL)
741 # This is really unusual, and deserves maximum forensics.
742 self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
744 d.addErrback(_failed)
745 d.addCallback(self._done)
748 def _get_all_shareholders(self):
750 sb = self._storage_broker
751 servers = sb.get_servers_for_index(self._storage_index)
753 raise NoServersError("broker gave us no servers!")
754 for (peerid,ss) in servers:
755 self.log(format="sending DYHB to [%(peerid)s]",
756 peerid=idlib.shortnodeid_b2a(peerid),
757 level=log.NOISY, umid="rT03hg")
758 d = ss.callRemote("get_buckets", self._storage_index)
759 d.addCallbacks(self._got_response, self._got_error,
760 callbackArgs=(peerid,))
762 self._responses_received = 0
763 self._queries_sent = len(dl)
765 self._status.set_status("Locating Shares (%d/%d)" %
766 (self._responses_received,
768 return defer.DeferredList(dl)
770 def _got_response(self, buckets, peerid):
771 self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
772 peerid=idlib.shortnodeid_b2a(peerid),
773 shnums=sorted(buckets.keys()),
774 level=log.NOISY, umid="o4uwFg")
775 self._responses_received += 1
777 elapsed = time.time() - self._started
778 self._results.timings["servers_peer_selection"][peerid] = elapsed
780 self._status.set_status("Locating Shares (%d/%d)" %
781 (self._responses_received,
783 for sharenum, bucket in buckets.iteritems():
784 b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
785 self.add_share_bucket(sharenum, b)
788 if peerid not in self._results.servermap:
789 self._results.servermap[peerid] = set()
790 self._results.servermap[peerid].add(sharenum)
792 def add_share_bucket(self, sharenum, bucket):
793 # this is split out for the benefit of test_encode.py
794 self._share_buckets.append( (sharenum, bucket) )
796 def _got_error(self, f):
798 if f.check(DeadReferenceError):
800 self.log("Error during get_buckets", failure=f, level=level,
803 def bucket_failed(self, vbucket):
804 shnum = vbucket.sharenum
805 del self.active_buckets[shnum]
806 s = self._share_vbuckets[shnum]
807 # s is a set of ValidatedReadBucketProxy instances
809 # ... which might now be empty
811 # there are no more buckets which can provide this share, so
812 # remove the key. This may prompt us to use a different share.
813 del self._share_vbuckets[shnum]
815 def _got_all_shareholders(self, res):
818 self._results.timings["peer_selection"] = now - self._started
820 if len(self._share_buckets) < self._verifycap.needed_shares:
821 msg = "Failed to get enough shareholders: have %d, need %d" \
822 % (len(self._share_buckets), self._verifycap.needed_shares)
823 if self._share_buckets:
824 raise NotEnoughSharesError(msg)
826 raise NoSharesError(msg)
828 #for s in self._share_vbuckets.values():
830 # assert isinstance(vb, ValidatedReadBucketProxy), \
831 # "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
833 def _obtain_uri_extension(self, ignored):
834 # all shareholders are supposed to have a copy of uri_extension, and
835 # all are supposed to be identical. We compute the hash of the data
836 # that comes back, and compare it against the version in our URI. If
837 # they don't match, ignore their data and try someone else.
839 self._status.set_status("Obtaining URI Extension")
841 uri_extension_fetch_started = time.time()
844 for sharenum, bucket in self._share_buckets:
845 vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
846 vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
849 def _got_uri_extension(vup):
850 precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
852 elapsed = time.time() - uri_extension_fetch_started
853 self._results.timings["uri_extension"] = elapsed
856 self._codec = codec.CRSDecoder()
857 self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
858 self._tail_codec = codec.CRSDecoder()
859 self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
861 self._current_segnum = 0
863 self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
864 self._share_hash_tree.set_hashes({0: vup.share_root_hash})
866 self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
867 self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
869 # Repairer (uploader) needs the encodingparams.
870 self._target.set_encodingparams((
871 self._verifycap.needed_shares,
872 self._verifycap.total_shares, # I don't think the target actually cares about "happy".
873 self._verifycap.total_shares,
874 self._vup.segment_size
876 d.addCallback(_got_uri_extension)
879 def _get_crypttext_hash_tree(self, res):
881 for sharenum, bucket in self._share_buckets:
882 vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
885 _get_crypttext_hash_tree_started = time.time()
887 self._status.set_status("Retrieving crypttext hash tree")
889 vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
892 def _got_crypttext_hash_tree(res):
893 # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
896 elapsed = time.time() - _get_crypttext_hash_tree_started
897 self._results.timings["hashtrees"] = elapsed
898 d.addCallback(_got_crypttext_hash_tree)
901 def _activate_enough_buckets(self):
902 """either return a mapping from shnum to a ValidatedReadBucketProxy that can
903 provide data for that share, or raise NotEnoughSharesError"""
905 while len(self.active_buckets) < self._verifycap.needed_shares:
907 handled_shnums = set(self.active_buckets.keys())
908 available_shnums = set(self._share_vbuckets.keys())
909 potential_shnums = list(available_shnums - handled_shnums)
910 if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
911 have = len(potential_shnums) + len(self.active_buckets)
912 msg = "Unable to activate enough shares: have %d, need %d" \
913 % (have, self._verifycap.needed_shares)
915 raise NotEnoughSharesError(msg)
917 raise NoSharesError(msg)
918 # For the next share, choose a primary share if available, else a randomly chosen
920 potential_shnums.sort()
921 if potential_shnums[0] < self._verifycap.needed_shares:
922 shnum = potential_shnums[0]
924 shnum = random.choice(potential_shnums)
925 # and a random bucket that will provide it
926 validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
927 self.active_buckets[shnum] = validated_bucket
928 return self.active_buckets
931 def _download_all_segments(self, res):
932 for sharenum, bucket in self._share_buckets:
933 vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
934 self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
936 # after the above code, self._share_vbuckets contains enough
937 # buckets to complete the download, and some extra ones to
938 # tolerate some buckets dropping out or having
939 # errors. self._share_vbuckets is a dictionary that maps from
940 # shnum to a set of ValidatedBuckets, which themselves are
941 # wrappers around RIBucketReader references.
942 self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
944 self._started_fetching = time.time()
946 d = defer.succeed(None)
947 for segnum in range(self._vup.num_segments):
948 d.addCallback(self._download_segment, segnum)
949 # this pause, at the end of write, prevents pre-fetch from
950 # happening until the consumer is ready for more data.
951 d.addCallback(self._check_for_pause)
954 def _check_for_pause(self, res):
957 self._paused.addCallback(lambda ignored: d.callback(res))
960 raise DownloadStopped("our Consumer called stopProducing()")
961 self._monitor.raise_if_cancelled()
964 def _download_segment(self, res, segnum):
966 self._status.set_status("Downloading segment %d of %d" %
967 (segnum+1, self._vup.num_segments))
968 self.log("downloading seg#%d of %d (%d%%)"
969 % (segnum, self._vup.num_segments,
970 100.0 * segnum / self._vup.num_segments))
971 # memory footprint: when the SegmentDownloader finishes pulling down
972 # all shares, we have 1*segment_size of usage.
973 segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
975 started = time.time()
976 d = segmentdler.start()
977 def _finished_fetching(res):
978 elapsed = time.time() - started
979 self._results.timings["cumulative_fetch"] += elapsed
982 d.addCallback(_finished_fetching)
983 # pause before using more memory
984 d.addCallback(self._check_for_pause)
985 # while the codec does its job, we hit 2*segment_size
986 def _started_decode(res):
987 self._started_decode = time.time()
990 d.addCallback(_started_decode)
991 if segnum + 1 == self._vup.num_segments:
992 codec = self._tail_codec
995 d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
996 # once the codec is done, we drop back to 1*segment_size, because
997 # 'shares' goes out of scope. The memory usage is all in the
998 # plaintext now, spread out into a bunch of tiny buffers.
999 def _finished_decode(res):
1000 elapsed = time.time() - self._started_decode
1001 self._results.timings["cumulative_decode"] += elapsed
1004 d.addCallback(_finished_decode)
1006 # pause/check-for-stop just before writing, to honor stopProducing
1007 d.addCallback(self._check_for_pause)
1008 d.addCallback(self._got_segment)
1011 def _got_segment(self, buffers):
1012 precondition(self._crypttext_hash_tree)
1013 started_decrypt = time.time()
1014 self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
1016 if self._current_segnum + 1 == self._vup.num_segments:
1017 # This is the last segment.
1018 # Trim off any padding added by the upload side. We never send empty segments. If
1019 # the data was an exact multiple of the segment size, the last segment will be full.
1020 tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1021 num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1022 # Remove buffers which don't contain any part of the tail.
1023 del buffers[num_buffers_used:]
1024 # Remove the past-the-tail-part of the last buffer.
1025 tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1026 if tail_in_last_buf == 0:
1027 tail_in_last_buf = tail_buf_size
1028 buffers[-1] = buffers[-1][:tail_in_last_buf]
1030 # First compute the hash of this segment and check that it fits.
1031 ch = hashutil.crypttext_segment_hasher()
1032 for buffer in buffers:
1033 self._ciphertext_hasher.update(buffer)
1035 self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1037 # Then write this segment to the target.
1038 if not self._opened:
1040 self._target.open(self._verifycap.size)
1042 for buffer in buffers:
1043 self._target.write(buffer)
1044 self._bytes_done += len(buffer)
1046 self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1047 self._current_segnum += 1
1050 elapsed = time.time() - started_decrypt
1051 self._results.timings["cumulative_decrypt"] += elapsed
1053 def _done(self, res):
1054 self.log("download done")
1057 self._results.timings["total"] = now - self._started
1058 self._results.timings["segments"] = now - self._started_fetching
1059 if self._vup.crypttext_hash:
1060 _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1061 "bad crypttext_hash: computed=%s, expected=%s" %
1062 (base32.b2a(self._ciphertext_hasher.digest()),
1063 base32.b2a(self._vup.crypttext_hash)))
1064 _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1065 self._status.set_progress(1)
1066 self._target.close()
1067 return self._target.finish()
1068 def get_download_status(self):
1073 implements(IDownloadTarget)
1074 def __init__(self, filename):
1075 self._filename = filename
1077 def open(self, size):
1078 self.f = open(self._filename, "wb")
1080 def write(self, data):
1085 def fail(self, why):
1088 os.unlink(self._filename)
1089 def register_canceller(self, cb):
1090 pass # we won't use it
1093 # The following methods are just because the target might be a repairer.DownUpConnector,
1094 # and just because the current CHKUpload object expects to find the storage index and
1095 # encoding parameters in its Uploadable.
1096 def set_storageindex(self, storageindex):
1098 def set_encodingparams(self, encodingparams):
1102 implements(IDownloadTarget)
1105 def open(self, size):
1107 def write(self, data):
1108 self._data.append(data)
1110 self.data = "".join(self._data)
1112 def fail(self, why):
1114 def register_canceller(self, cb):
1115 pass # we won't use it
1118 # The following methods are just because the target might be a repairer.DownUpConnector,
1119 # and just because the current CHKUpload object expects to find the storage index and
1120 # encoding parameters in its Uploadable.
1121 def set_storageindex(self, storageindex):
1123 def set_encodingparams(self, encodingparams):
1127 """Use me to download data to a pre-defined filehandle-like object. I
1128 will use the target's write() method. I will *not* close the filehandle:
1129 I leave that up to the originator of the filehandle. The download process
1130 will return the filehandle when it completes.
1132 implements(IDownloadTarget)
1133 def __init__(self, filehandle):
1134 self._filehandle = filehandle
1135 def open(self, size):
1137 def write(self, data):
1138 self._filehandle.write(data)
1140 # the originator of the filehandle reserves the right to close it
1142 def fail(self, why):
1144 def register_canceller(self, cb):
1147 return self._filehandle
1148 # The following methods are just because the target might be a repairer.DownUpConnector,
1149 # and just because the current CHKUpload object expects to find the storage index and
1150 # encoding parameters in its Uploadable.
1151 def set_storageindex(self, storageindex):
1153 def set_encodingparams(self, encodingparams):
1156 class ConsumerAdapter:
1157 implements(IDownloadTarget, IConsumer)
1158 def __init__(self, consumer):
1159 self._consumer = consumer
1161 def registerProducer(self, producer, streaming):
1162 self._consumer.registerProducer(producer, streaming)
1163 def unregisterProducer(self):
1164 self._consumer.unregisterProducer()
1166 def open(self, size):
1168 def write(self, data):
1169 self._consumer.write(data)
1173 def fail(self, why):
1175 def register_canceller(self, cb):
1178 return self._consumer
1179 # The following methods are just because the target might be a repairer.DownUpConnector,
1180 # and just because the current CHKUpload object expects to find the storage index and
1181 # encoding parameters in its Uploadable.
1182 def set_storageindex(self, storageindex):
1184 def set_encodingparams(self, encodingparams):
1188 class Downloader(service.MultiService):
1189 """I am a service that allows file downloading.
1191 # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1192 # It is scheduled to go away, to be replaced by filenode.download()
1193 implements(IDownloader)
1196 def __init__(self, stats_provider=None):
1197 service.MultiService.__init__(self)
1198 self.stats_provider = stats_provider
1199 self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1201 def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1205 t = IDownloadTarget(t)
1209 assert isinstance(u, uri.CHKFileURI)
1210 if self.stats_provider:
1211 # these counters are meant for network traffic, and don't
1213 self.stats_provider.count('downloader.files_downloaded', 1)
1214 self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1215 storage_broker = self.parent.get_storage_broker()
1217 target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1220 dl = CiphertextDownloader(storage_broker, u.get_verify_cap(), target,
1222 self._all_downloads[dl] = None
1224 history.add_download(dl.get_download_status())
1229 def download_to_data(self, uri, _log_msg_id=None, history=None):
1230 return self.download(uri, Data(), _log_msg_id=_log_msg_id, history=history)
1231 def download_to_filename(self, uri, filename, _log_msg_id=None):
1232 return self.download(uri, FileName(filename), _log_msg_id=_log_msg_id)
1233 def download_to_filehandle(self, uri, filehandle, _log_msg_id=None):
1234 return self.download(uri, FileHandle(filehandle), _log_msg_id=_log_msg_id)