1 import os, random, weakref, itertools, time
2 from zope.interface import implements
3 from twisted.internet import defer
4 from twisted.internet.interfaces import IPushProducer, IConsumer
5 from twisted.application import service
6 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, deferredutil, hashutil, log, mathutil
9 from allmydata.util.assertutil import _assert, precondition
10 from allmydata import codec, hashtree, uri
11 from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
12 IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError, \
13 UnableToFetchCriticalDownloadDataError
14 from allmydata.immutable import layout
15 from allmydata.monitor import Monitor
16 from pycryptopp.cipher.aes import AES
18 class IntegrityCheckReject(Exception):
21 class BadURIExtensionHashValue(IntegrityCheckReject):
23 class BadURIExtension(IntegrityCheckReject):
25 class UnsupportedErasureCodec(BadURIExtension):
27 class BadCrypttextHashValue(IntegrityCheckReject):
29 class BadOrMissingHash(IntegrityCheckReject):
32 class DownloadStopped(Exception):
35 class DownloadResults:
36 implements(IDownloadResults)
39 self.servers_used = set()
40 self.server_problems = {}
45 class DecryptingTarget(log.PrefixingLogMixin):
46 implements(IDownloadTarget, IConsumer)
47 def __init__(self, target, key, _log_msg_id=None):
48 precondition(IDownloadTarget.providedBy(target), target)
50 self._decryptor = AES(key)
52 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
53 # methods to satisfy the IConsumer interface
54 def registerProducer(self, producer, streaming):
55 if IConsumer.providedBy(self.target):
56 self.target.registerProducer(producer, streaming)
57 def unregisterProducer(self):
58 if IConsumer.providedBy(self.target):
59 self.target.unregisterProducer()
60 def write(self, ciphertext):
61 plaintext = self._decryptor.process(ciphertext)
62 self.target.write(plaintext)
64 self.target.open(size)
68 return self.target.finish()
69 # The following methods is just to pass through to the next target, and just because that
70 # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
71 # expects to find the storage index in its Uploadable.
72 def set_storageindex(self, storageindex):
73 self.target.set_storageindex(storageindex)
74 def set_encodingparams(self, encodingparams):
75 self.target.set_encodingparams(encodingparams)
77 class ValidatedThingObtainer:
78 def __init__(self, validatedthingproxies, debugname, log_id):
79 self._validatedthingproxies = validatedthingproxies
80 self._debugname = debugname
83 def _bad(self, f, validatedthingproxy):
84 failtype = f.trap(RemoteException, DeadReferenceError,
85 IntegrityCheckReject, layout.LayoutInvalid,
86 layout.ShareVersionIncompatible)
88 if f.check(DeadReferenceError):
90 elif f.check(RemoteException):
94 log.msg(parent=self._log_id, facility="tahoe.immutable.download",
95 format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
96 op=self._debugname, validatedthingproxy=str(validatedthingproxy),
97 failure=f, level=level, umid="JGXxBA")
98 if not self._validatedthingproxies:
99 raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
100 # try again with a different one
101 d = self._try_the_next_one()
104 def _try_the_next_one(self):
105 vtp = self._validatedthingproxies.pop(0)
106 d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
107 d.addErrback(self._bad, vtp)
111 return self._try_the_next_one()
113 class ValidatedCrypttextHashTreeProxy:
114 implements(IValidatedThingProxy)
115 """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
116 its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
117 start() method that fires with self once I get a valid one). """
118 def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
119 # fetch_failures is for debugging -- see test_encode.py
120 self._readbucketproxy = readbucketproxy
121 self._num_segments = num_segments
122 self._fetch_failures = fetch_failures
123 self._crypttext_hash_tree = crypttext_hash_tree
125 def _validate(self, proposal):
126 ct_hashes = dict(list(enumerate(proposal)))
128 self._crypttext_hash_tree.set_hashes(ct_hashes)
129 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
130 if self._fetch_failures is not None:
131 self._fetch_failures["crypttext_hash_tree"] += 1
132 raise BadOrMissingHash(le)
133 # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
134 # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
135 for segnum in range(self._num_segments):
136 if self._crypttext_hash_tree.needed_hashes(segnum):
137 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
141 d = self._readbucketproxy.get_crypttext_hashes()
142 d.addCallback(self._validate)
145 class ValidatedExtendedURIProxy:
146 implements(IValidatedThingProxy)
147 """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
148 retrieving and validating the elements from the UEB. """
150 def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
151 # fetch_failures is for debugging -- see test_encode.py
152 self._fetch_failures = fetch_failures
153 self._readbucketproxy = readbucketproxy
154 precondition(IVerifierURI.providedBy(verifycap), verifycap)
155 self._verifycap = verifycap
158 self.segment_size = None
159 self.crypttext_root_hash = None
160 self.share_root_hash = None
163 self.block_size = None
164 self.share_size = None
165 self.num_segments = None
166 self.tail_data_size = None
167 self.tail_segment_size = None
170 self.crypttext_hash = None
173 return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
175 def _check_integrity(self, data):
176 h = hashutil.uri_extension_hash(data)
177 if h != self._verifycap.uri_extension_hash:
178 msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
179 (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
180 if self._fetch_failures is not None:
181 self._fetch_failures["uri_extension"] += 1
182 raise BadURIExtensionHashValue(msg)
186 def _parse_and_validate(self, data):
187 self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
189 d = uri.unpack_extension(data)
191 # There are several kinds of things that can be found in a UEB. First, things that we
192 # really need to learn from the UEB in order to do this download. Next: things which are
193 # optional but not redundant -- if they are present in the UEB they will get used. Next,
194 # things that are optional and redundant. These things are required to be consistent:
195 # they don't have to be in the UEB, but if they are in the UEB then they will be checked
196 # for consistency with the already-known facts, and if they are inconsistent then an
197 # exception will be raised. These things aren't actually used -- they are just tested
198 # for consistency and ignored. Finally: things which are deprecated -- they ought not be
199 # in the UEB at all, and if they are present then a warning will be logged but they are
202 # First, things that we really need to learn from the UEB: segment_size,
203 # crypttext_root_hash, and share_root_hash.
204 self.segment_size = d['segment_size']
206 self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
207 self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
209 self.tail_data_size = self._verifycap.size % self.segment_size
210 if not self.tail_data_size:
211 self.tail_data_size = self.segment_size
212 # padding for erasure code
213 self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
215 # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
216 # matches this read-cap or verify-cap. The integrity check on the shares is not
217 # sufficient to prevent the original encoder from creating some shares of file A and
218 # other shares of file B.
219 self.crypttext_root_hash = d['crypttext_root_hash']
221 self.share_root_hash = d['share_root_hash']
224 # Next: things that are optional and not redundant: crypttext_hash
225 if d.has_key('crypttext_hash'):
226 self.crypttext_hash = d['crypttext_hash']
227 if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
228 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
231 # Next: things that are optional, redundant, and required to be consistent: codec_name,
232 # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
233 if d.has_key('codec_name'):
234 if d['codec_name'] != "crs":
235 raise UnsupportedErasureCodec(d['codec_name'])
237 if d.has_key('codec_params'):
238 ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
239 if ucpss != self.segment_size:
240 raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
241 "self.segment_size: %s" % (ucpss, self.segment_size))
242 if ucpns != self._verifycap.needed_shares:
243 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
244 "self._verifycap.needed_shares: %s" % (ucpns,
245 self._verifycap.needed_shares))
246 if ucpts != self._verifycap.total_shares:
247 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
248 "self._verifycap.total_shares: %s" % (ucpts,
249 self._verifycap.total_shares))
251 if d.has_key('tail_codec_params'):
252 utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
253 if utcpss != self.tail_segment_size:
254 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
255 "self.tail_segment_size: %s, self._verifycap.size: %s, "
256 "self.segment_size: %s, self._verifycap.needed_shares: %s"
257 % (utcpss, self.tail_segment_size, self._verifycap.size,
258 self.segment_size, self._verifycap.needed_shares))
259 if utcpns != self._verifycap.needed_shares:
260 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
261 "self._verifycap.needed_shares: %s" % (utcpns,
262 self._verifycap.needed_shares))
263 if utcpts != self._verifycap.total_shares:
264 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
265 "self._verifycap.total_shares: %s" % (utcpts,
266 self._verifycap.total_shares))
268 if d.has_key('num_segments'):
269 if d['num_segments'] != self.num_segments:
270 raise BadURIExtension("inconsistent num_segments: size: %s, "
271 "segment_size: %s, computed_num_segments: %s, "
272 "ueb_num_segments: %s" % (self._verifycap.size,
274 self.num_segments, d['num_segments']))
276 if d.has_key('size'):
277 if d['size'] != self._verifycap.size:
278 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
279 (self._verifycap.size, d['size']))
281 if d.has_key('needed_shares'):
282 if d['needed_shares'] != self._verifycap.needed_shares:
283 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
284 "needed shares: %s" % (self._verifycap.total_shares,
287 if d.has_key('total_shares'):
288 if d['total_shares'] != self._verifycap.total_shares:
289 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
290 "total shares: %s" % (self._verifycap.total_shares,
293 # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
294 if d.get('plaintext_hash'):
295 log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
296 "and is no longer used. Ignoring. %s" % (self,))
297 if d.get('plaintext_root_hash'):
298 log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
299 "reasons and is no longer used. Ignoring. %s" % (self,))
304 """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
305 it. Returns a deferred which is called back with self once the fetch is successful, or
306 is erred back if it fails. """
307 d = self._readbucketproxy.get_uri_extension()
308 d.addCallback(self._check_integrity)
309 d.addCallback(self._parse_and_validate)
312 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
313 """I am a front-end for a remote storage bucket, responsible for retrieving and validating
314 data from that bucket.
316 My get_block() method is used by BlockDownloaders.
319 def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
320 """ share_hash_tree is required to have already been initialized with the root hash
321 (the number-0 hash), using the share_root_hash from the UEB """
322 precondition(share_hash_tree[0] is not None, share_hash_tree)
323 prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
324 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
325 self.sharenum = sharenum
327 self.share_hash_tree = share_hash_tree
328 self.num_blocks = num_blocks
329 self.block_size = block_size
330 self.share_size = share_size
331 self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
333 def get_block(self, blocknum):
334 # the first time we use this bucket, we need to fetch enough elements
335 # of the share hash tree to validate it from our share hash up to the
337 if self.share_hash_tree.needed_hashes(self.sharenum):
338 d1 = self.bucket.get_share_hashes()
340 d1 = defer.succeed([])
342 # We might need to grab some elements of our block hash tree, to
343 # validate the requested block up to the share hash.
344 blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
345 # We don't need the root of the block hash tree, as that comes in the share tree.
346 blockhashesneeded.discard(0)
347 d2 = self.bucket.get_block_hashes(blockhashesneeded)
349 if blocknum < self.num_blocks-1:
350 thisblocksize = self.block_size
352 thisblocksize = self.share_size % self.block_size
353 if thisblocksize == 0:
354 thisblocksize = self.block_size
355 d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
357 dl = deferredutil.gatherResults([d1, d2, d3])
358 dl.addCallback(self._got_data, blocknum)
361 def _got_data(self, results, blocknum):
362 precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
363 sharehashes, blockhashes, blockdata = results
365 sharehashes = dict(sharehashes)
366 except ValueError, le:
367 le.args = tuple(le.args + (sharehashes,))
369 blockhashes = dict(enumerate(blockhashes))
371 candidate_share_hash = None # in case we log it in the except block below
372 blockhash = None # in case we log it in the except block below
375 if self.share_hash_tree.needed_hashes(self.sharenum):
376 # This will raise exception if the values being passed do not match the root
377 # node of self.share_hash_tree.
379 self.share_hash_tree.set_hashes(sharehashes)
380 except IndexError, le:
381 # Weird -- sharehashes contained index numbers outside of the range that fit
382 # into this hash tree.
383 raise BadOrMissingHash(le)
385 # To validate a block we need the root of the block hash tree, which is also one of
386 # the leafs of the share hash tree, and is called "the share hash".
387 if not self.block_hash_tree[0]: # empty -- no root node yet
388 # Get the share hash from the share hash tree.
389 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
391 raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
392 self.block_hash_tree.set_hashes({0: share_hash})
394 if self.block_hash_tree.needed_hashes(blocknum):
395 self.block_hash_tree.set_hashes(blockhashes)
397 blockhash = hashutil.block_hash(blockdata)
398 self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
399 #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
401 # (self.sharenum, blocknum, len(blockdata),
402 # blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
404 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
405 # log.WEIRD: indicates undetected disk/network error, or more
406 # likely a programming error
407 self.log("hash failure in block=%d, shnum=%d on %s" %
408 (blocknum, self.sharenum, self.bucket))
409 if self.block_hash_tree.needed_hashes(blocknum):
410 self.log(""" failure occurred when checking the block_hash_tree.
411 This suggests that either the block data was bad, or that the
412 block hashes we received along with it were bad.""")
414 self.log(""" the failure probably occurred when checking the
415 share_hash_tree, which suggests that the share hashes we
416 received from the remote peer were bad.""")
417 self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
418 self.log(" block length: %d" % len(blockdata))
419 self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
420 if len(blockdata) < 100:
421 self.log(" block data: %r" % (blockdata,))
423 self.log(" block data start/end: %r .. %r" %
424 (blockdata[:50], blockdata[-50:]))
425 self.log(" share hash tree:\n" + self.share_hash_tree.dump())
426 self.log(" block hash tree:\n" + self.block_hash_tree.dump())
428 for i,h in sorted(sharehashes.items()):
429 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
430 self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
432 for i,h in blockhashes.items():
433 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
434 log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
435 raise BadOrMissingHash(le)
437 # If we made it here, the block is good. If the hash trees didn't
438 # like what they saw, they would have raised a BadHashError, causing
439 # our caller to see a Failure and thus ignore this block (as well as
440 # dropping this bucket).
445 class BlockDownloader(log.PrefixingLogMixin):
446 """I am responsible for downloading a single block (from a single bucket)
447 for a single segment.
449 I am a child of the SegmentDownloader.
452 def __init__(self, vbucket, blocknum, parent, results):
453 precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
454 prefix = "%s-%d" % (vbucket, blocknum)
455 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
456 self.vbucket = vbucket
457 self.blocknum = blocknum
459 self.results = results
461 def start(self, segnum):
462 self.log("get_block(segnum=%d)" % segnum)
463 started = time.time()
464 d = self.vbucket.get_block(segnum)
465 d.addCallbacks(self._hold_block, self._got_block_error,
466 callbackArgs=(started,))
469 def _hold_block(self, data, started):
471 elapsed = time.time() - started
472 peerid = self.vbucket.bucket.get_peerid()
473 if peerid not in self.results.timings["fetch_per_server"]:
474 self.results.timings["fetch_per_server"][peerid] = []
475 self.results.timings["fetch_per_server"][peerid].append(elapsed)
476 self.log("got block")
477 self.parent.hold_block(self.blocknum, data)
479 def _got_block_error(self, f):
480 failtype = f.trap(RemoteException, DeadReferenceError,
481 IntegrityCheckReject,
482 layout.LayoutInvalid, layout.ShareVersionIncompatible)
483 if f.check(RemoteException, DeadReferenceError):
487 self.log("failure to get block", level=level, umid="5Z4uHQ")
489 peerid = self.vbucket.bucket.get_peerid()
490 self.results.server_problems[peerid] = str(f)
491 self.parent.bucket_failed(self.vbucket)
493 class SegmentDownloader:
494 """I am responsible for downloading all the blocks for a single segment
497 I am a child of the CiphertextDownloader.
500 def __init__(self, parent, segmentnumber, needed_shares, results):
502 self.segmentnumber = segmentnumber
503 self.needed_blocks = needed_shares
504 self.blocks = {} # k: blocknum, v: data
505 self.results = results
506 self._log_number = self.parent.log("starting segment %d" %
509 def log(self, *args, **kwargs):
510 if "parent" not in kwargs:
511 kwargs["parent"] = self._log_number
512 return self.parent.log(*args, **kwargs)
515 return self._download()
520 if len(self.blocks) >= self.needed_blocks:
521 # we only need self.needed_blocks blocks
522 # we want to get the smallest blockids, because they are
523 # more likely to be fast "primary blocks"
524 blockids = sorted(self.blocks.keys())[:self.needed_blocks]
526 for blocknum in blockids:
527 blocks.append(self.blocks[blocknum])
528 return (blocks, blockids)
530 return self._download()
535 # fill our set of active buckets, maybe raising NotEnoughSharesError
536 active_buckets = self.parent._activate_enough_buckets()
537 # Now we have enough buckets, in self.parent.active_buckets.
539 # in test cases, bd.start might mutate active_buckets right away, so
540 # we need to put off calling start() until we've iterated all the way
543 for blocknum, vbucket in active_buckets.iteritems():
544 assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
545 bd = BlockDownloader(vbucket, blocknum, self, self.results)
546 downloaders.append(bd)
548 self.results.servers_used.add(vbucket.bucket.get_peerid())
549 l = [bd.start(self.segmentnumber) for bd in downloaders]
550 return defer.DeferredList(l, fireOnOneErrback=True)
552 def hold_block(self, blocknum, data):
553 self.blocks[blocknum] = data
555 def bucket_failed(self, vbucket):
556 self.parent.bucket_failed(vbucket)
558 class DownloadStatus:
559 implements(IDownloadStatus)
560 statusid_counter = itertools.count(0)
563 self.storage_index = None
566 self.status = "Not started"
572 self.counter = self.statusid_counter.next()
573 self.started = time.time()
575 def get_started(self):
577 def get_storage_index(self):
578 return self.storage_index
581 def using_helper(self):
583 def get_status(self):
586 status += " (output paused)"
588 status += " (output stopped)"
590 def get_progress(self):
592 def get_active(self):
594 def get_results(self):
596 def get_counter(self):
599 def set_storage_index(self, si):
600 self.storage_index = si
601 def set_size(self, size):
603 def set_helper(self, helper):
605 def set_status(self, status):
607 def set_paused(self, paused):
609 def set_stopped(self, stopped):
610 self.stopped = stopped
611 def set_progress(self, value):
612 self.progress = value
613 def set_active(self, value):
615 def set_results(self, value):
618 class CiphertextDownloader(log.PrefixingLogMixin):
619 """ I download shares, check their integrity, then decode them, check the integrity of the
620 resulting ciphertext, then and write it to my target. Before I send any new request to a
621 server, I always ask the "monitor" object that was passed into my constructor whether this
622 task has been cancelled (by invoking its raise_if_cancelled() method). """
623 implements(IPushProducer)
626 def __init__(self, client, v, target, monitor):
628 precondition(IVerifierURI.providedBy(v), v)
629 precondition(IDownloadTarget.providedBy(target), target)
631 prefix=base32.b2a_l(v.storage_index[:8], 60)
632 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
633 self._client = client
636 self._storage_index = v.storage_index
637 self._uri_extension_hash = v.uri_extension_hash
639 self._started = time.time()
640 self._status = s = DownloadStatus()
641 s.set_status("Starting")
642 s.set_storage_index(self._storage_index)
643 s.set_size(self._verifycap.size)
647 self._results = DownloadResults()
648 s.set_results(self._results)
649 self._results.file_size = self._verifycap.size
650 self._results.timings["servers_peer_selection"] = {}
651 self._results.timings["fetch_per_server"] = {}
652 self._results.timings["cumulative_fetch"] = 0.0
653 self._results.timings["cumulative_decode"] = 0.0
654 self._results.timings["cumulative_decrypt"] = 0.0
655 self._results.timings["paused"] = 0.0
658 self._stopped = False
659 if IConsumer.providedBy(target):
660 target.registerProducer(self, True)
661 self._target = target
662 self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
663 self._monitor = monitor
666 self.active_buckets = {} # k: shnum, v: bucket
667 self._share_buckets = [] # list of (sharenum, bucket) tuples
668 self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
670 self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
672 self._ciphertext_hasher = hashutil.crypttext_hasher()
675 self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
677 # _got_uri_extension() will create the following:
678 # self._crypttext_hash_tree
679 # self._share_hash_tree
680 # self._current_segnum = 0
681 # self._vup # ValidatedExtendedURIProxy
683 def pauseProducing(self):
686 self._paused = defer.Deferred()
687 self._paused_at = time.time()
689 self._status.set_paused(True)
691 def resumeProducing(self):
693 paused_for = time.time() - self._paused_at
694 self._results.timings['paused'] += paused_for
697 eventually(p.callback, None)
699 self._status.set_paused(False)
701 def stopProducing(self):
702 self.log("Download.stopProducing")
704 self.resumeProducing()
706 self._status.set_stopped(True)
707 self._status.set_active(False)
710 self.log("starting download")
712 # first step: who should we download from?
713 d = defer.maybeDeferred(self._get_all_shareholders)
714 d.addCallback(self._got_all_shareholders)
715 # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
716 d.addCallback(self._obtain_uri_extension)
717 d.addCallback(self._get_crypttext_hash_tree)
718 # once we know that, we can download blocks from everybody
719 d.addCallback(self._download_all_segments)
722 self._status.set_status("Finished")
723 self._status.set_active(False)
724 self._status.set_paused(False)
725 if IConsumer.providedBy(self._target):
726 self._target.unregisterProducer()
731 self._status.set_status("Failed")
732 self._status.set_active(False)
733 if why.check(DownloadStopped):
734 # DownloadStopped just means the consumer aborted the download; not so scary.
735 self.log("download stopped", level=log.UNUSUAL)
737 # This is really unusual, and deserves maximum forensics.
738 self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
740 d.addErrback(_failed)
741 d.addCallback(self._done)
744 def _get_all_shareholders(self):
746 for (peerid,ss) in self._client.get_permuted_peers("storage",
747 self._storage_index):
748 d = ss.callRemote("get_buckets", self._storage_index)
749 d.addCallbacks(self._got_response, self._got_error,
750 callbackArgs=(peerid,))
752 self._responses_received = 0
753 self._queries_sent = len(dl)
755 self._status.set_status("Locating Shares (%d/%d)" %
756 (self._responses_received,
758 return defer.DeferredList(dl)
760 def _got_response(self, buckets, peerid):
761 self._responses_received += 1
763 elapsed = time.time() - self._started
764 self._results.timings["servers_peer_selection"][peerid] = elapsed
766 self._status.set_status("Locating Shares (%d/%d)" %
767 (self._responses_received,
769 for sharenum, bucket in buckets.iteritems():
770 b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
771 self.add_share_bucket(sharenum, b)
774 if peerid not in self._results.servermap:
775 self._results.servermap[peerid] = set()
776 self._results.servermap[peerid].add(sharenum)
778 def add_share_bucket(self, sharenum, bucket):
779 # this is split out for the benefit of test_encode.py
780 self._share_buckets.append( (sharenum, bucket) )
782 def _got_error(self, f):
784 if f.check(DeadReferenceError):
786 self.log("Error during get_buckets", failure=f, level=level,
789 def bucket_failed(self, vbucket):
790 shnum = vbucket.sharenum
791 del self.active_buckets[shnum]
792 s = self._share_vbuckets[shnum]
793 # s is a set of ValidatedReadBucketProxy instances
795 # ... which might now be empty
797 # there are no more buckets which can provide this share, so
798 # remove the key. This may prompt us to use a different share.
799 del self._share_vbuckets[shnum]
801 def _got_all_shareholders(self, res):
804 self._results.timings["peer_selection"] = now - self._started
806 if len(self._share_buckets) < self._verifycap.needed_shares:
807 raise NotEnoughSharesError("Failed to get enough shareholders",
808 len(self._share_buckets),
809 self._verifycap.needed_shares)
811 #for s in self._share_vbuckets.values():
813 # assert isinstance(vb, ValidatedReadBucketProxy), \
814 # "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
816 def _obtain_uri_extension(self, ignored):
817 # all shareholders are supposed to have a copy of uri_extension, and
818 # all are supposed to be identical. We compute the hash of the data
819 # that comes back, and compare it against the version in our URI. If
820 # they don't match, ignore their data and try someone else.
822 self._status.set_status("Obtaining URI Extension")
824 uri_extension_fetch_started = time.time()
827 for sharenum, bucket in self._share_buckets:
828 vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
829 vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
832 def _got_uri_extension(vup):
833 precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
835 elapsed = time.time() - uri_extension_fetch_started
836 self._results.timings["uri_extension"] = elapsed
839 self._codec = codec.CRSDecoder()
840 self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
841 self._tail_codec = codec.CRSDecoder()
842 self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
844 self._current_segnum = 0
846 self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
847 self._share_hash_tree.set_hashes({0: vup.share_root_hash})
849 self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
850 self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
852 # Repairer (uploader) needs the encodingparams.
853 self._target.set_encodingparams((
854 self._verifycap.needed_shares,
855 self._verifycap.total_shares, # I don't think the target actually cares about "happy".
856 self._verifycap.total_shares,
857 self._vup.segment_size
859 d.addCallback(_got_uri_extension)
862 def _get_crypttext_hash_tree(self, res):
864 for sharenum, bucket in self._share_buckets:
865 vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
868 _get_crypttext_hash_tree_started = time.time()
870 self._status.set_status("Retrieving crypttext hash tree")
872 vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
875 def _got_crypttext_hash_tree(res):
876 # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
879 elapsed = time.time() - _get_crypttext_hash_tree_started
880 self._results.timings["hashtrees"] = elapsed
881 d.addCallback(_got_crypttext_hash_tree)
884 def _activate_enough_buckets(self):
885 """either return a mapping from shnum to a ValidatedReadBucketProxy that can
886 provide data for that share, or raise NotEnoughSharesError"""
888 while len(self.active_buckets) < self._verifycap.needed_shares:
890 handled_shnums = set(self.active_buckets.keys())
891 available_shnums = set(self._share_vbuckets.keys())
892 potential_shnums = list(available_shnums - handled_shnums)
893 if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
894 have = len(potential_shnums) + len(self.active_buckets)
895 raise NotEnoughSharesError("Unable to activate enough shares",
896 have, self._verifycap.needed_shares)
897 # For the next share, choose a primary share if available, else a randomly chosen
899 potential_shnums.sort()
900 if potential_shnums[0] < self._verifycap.needed_shares:
901 shnum = potential_shnums[0]
903 shnum = random.choice(potential_shnums)
904 # and a random bucket that will provide it
905 validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
906 self.active_buckets[shnum] = validated_bucket
907 return self.active_buckets
910 def _download_all_segments(self, res):
911 for sharenum, bucket in self._share_buckets:
912 vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
913 self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
915 # after the above code, self._share_vbuckets contains enough
916 # buckets to complete the download, and some extra ones to
917 # tolerate some buckets dropping out or having
918 # errors. self._share_vbuckets is a dictionary that maps from
919 # shnum to a set of ValidatedBuckets, which themselves are
920 # wrappers around RIBucketReader references.
921 self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
923 self._started_fetching = time.time()
925 d = defer.succeed(None)
926 for segnum in range(self._vup.num_segments):
927 d.addCallback(self._download_segment, segnum)
928 # this pause, at the end of write, prevents pre-fetch from
929 # happening until the consumer is ready for more data.
930 d.addCallback(self._check_for_pause)
933 def _check_for_pause(self, res):
936 self._paused.addCallback(lambda ignored: d.callback(res))
939 raise DownloadStopped("our Consumer called stopProducing()")
940 self._monitor.raise_if_cancelled()
943 def _download_segment(self, res, segnum):
945 self._status.set_status("Downloading segment %d of %d" %
946 (segnum+1, self._vup.num_segments))
947 self.log("downloading seg#%d of %d (%d%%)"
948 % (segnum, self._vup.num_segments,
949 100.0 * segnum / self._vup.num_segments))
950 # memory footprint: when the SegmentDownloader finishes pulling down
951 # all shares, we have 1*segment_size of usage.
952 segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
954 started = time.time()
955 d = segmentdler.start()
956 def _finished_fetching(res):
957 elapsed = time.time() - started
958 self._results.timings["cumulative_fetch"] += elapsed
961 d.addCallback(_finished_fetching)
962 # pause before using more memory
963 d.addCallback(self._check_for_pause)
964 # while the codec does its job, we hit 2*segment_size
965 def _started_decode(res):
966 self._started_decode = time.time()
969 d.addCallback(_started_decode)
970 if segnum + 1 == self._vup.num_segments:
971 codec = self._tail_codec
974 d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
975 # once the codec is done, we drop back to 1*segment_size, because
976 # 'shares' goes out of scope. The memory usage is all in the
977 # plaintext now, spread out into a bunch of tiny buffers.
978 def _finished_decode(res):
979 elapsed = time.time() - self._started_decode
980 self._results.timings["cumulative_decode"] += elapsed
983 d.addCallback(_finished_decode)
985 # pause/check-for-stop just before writing, to honor stopProducing
986 d.addCallback(self._check_for_pause)
987 d.addCallback(self._got_segment)
990 def _got_segment(self, buffers):
991 precondition(self._crypttext_hash_tree)
992 started_decrypt = time.time()
993 self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
995 if self._current_segnum + 1 == self._vup.num_segments:
996 # This is the last segment.
997 # Trim off any padding added by the upload side. We never send empty segments. If
998 # the data was an exact multiple of the segment size, the last segment will be full.
999 tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1000 num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1001 # Remove buffers which don't contain any part of the tail.
1002 del buffers[num_buffers_used:]
1003 # Remove the past-the-tail-part of the last buffer.
1004 tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1005 if tail_in_last_buf == 0:
1006 tail_in_last_buf = tail_buf_size
1007 buffers[-1] = buffers[-1][:tail_in_last_buf]
1009 # First compute the hash of this segment and check that it fits.
1010 ch = hashutil.crypttext_segment_hasher()
1011 for buffer in buffers:
1012 self._ciphertext_hasher.update(buffer)
1014 self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1016 # Then write this segment to the target.
1017 if not self._opened:
1019 self._target.open(self._verifycap.size)
1021 for buffer in buffers:
1022 self._target.write(buffer)
1023 self._bytes_done += len(buffer)
1025 self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1026 self._current_segnum += 1
1029 elapsed = time.time() - started_decrypt
1030 self._results.timings["cumulative_decrypt"] += elapsed
1032 def _done(self, res):
1033 self.log("download done")
1036 self._results.timings["total"] = now - self._started
1037 self._results.timings["segments"] = now - self._started_fetching
1038 if self._vup.crypttext_hash:
1039 _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1040 "bad crypttext_hash: computed=%s, expected=%s" %
1041 (base32.b2a(self._ciphertext_hasher.digest()),
1042 base32.b2a(self._vup.crypttext_hash)))
1043 _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1044 self._status.set_progress(1)
1045 self._target.close()
1046 return self._target.finish()
1047 def get_download_status(self):
1052 implements(IDownloadTarget)
1053 def __init__(self, filename):
1054 self._filename = filename
1056 def open(self, size):
1057 self.f = open(self._filename, "wb")
1059 def write(self, data):
1064 def fail(self, why):
1067 os.unlink(self._filename)
1068 def register_canceller(self, cb):
1069 pass # we won't use it
1072 # The following methods are just because the target might be a repairer.DownUpConnector,
1073 # and just because the current CHKUpload object expects to find the storage index and
1074 # encoding parameters in its Uploadable.
1075 def set_storageindex(self, storageindex):
1077 def set_encodingparams(self, encodingparams):
1081 implements(IDownloadTarget)
1084 def open(self, size):
1086 def write(self, data):
1087 self._data.append(data)
1089 self.data = "".join(self._data)
1091 def fail(self, why):
1093 def register_canceller(self, cb):
1094 pass # we won't use it
1097 # The following methods are just because the target might be a repairer.DownUpConnector,
1098 # and just because the current CHKUpload object expects to find the storage index and
1099 # encoding parameters in its Uploadable.
1100 def set_storageindex(self, storageindex):
1102 def set_encodingparams(self, encodingparams):
1106 """Use me to download data to a pre-defined filehandle-like object. I
1107 will use the target's write() method. I will *not* close the filehandle:
1108 I leave that up to the originator of the filehandle. The download process
1109 will return the filehandle when it completes.
1111 implements(IDownloadTarget)
1112 def __init__(self, filehandle):
1113 self._filehandle = filehandle
1114 def open(self, size):
1116 def write(self, data):
1117 self._filehandle.write(data)
1119 # the originator of the filehandle reserves the right to close it
1121 def fail(self, why):
1123 def register_canceller(self, cb):
1126 return self._filehandle
1127 # The following methods are just because the target might be a repairer.DownUpConnector,
1128 # and just because the current CHKUpload object expects to find the storage index and
1129 # encoding parameters in its Uploadable.
1130 def set_storageindex(self, storageindex):
1132 def set_encodingparams(self, encodingparams):
1135 class ConsumerAdapter:
1136 implements(IDownloadTarget, IConsumer)
1137 def __init__(self, consumer):
1138 self._consumer = consumer
1140 def registerProducer(self, producer, streaming):
1141 self._consumer.registerProducer(producer, streaming)
1142 def unregisterProducer(self):
1143 self._consumer.unregisterProducer()
1145 def open(self, size):
1147 def write(self, data):
1148 self._consumer.write(data)
1152 def fail(self, why):
1154 def register_canceller(self, cb):
1157 return self._consumer
1158 # The following methods are just because the target might be a repairer.DownUpConnector,
1159 # and just because the current CHKUpload object expects to find the storage index and
1160 # encoding parameters in its Uploadable.
1161 def set_storageindex(self, storageindex):
1163 def set_encodingparams(self, encodingparams):
1167 class Downloader(service.MultiService):
1168 """I am a service that allows file downloading.
1170 # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1171 # It is scheduled to go away, to be replaced by filenode.download()
1172 implements(IDownloader)
1175 def __init__(self, stats_provider=None):
1176 service.MultiService.__init__(self)
1177 self.stats_provider = stats_provider
1178 self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1180 def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1184 t = IDownloadTarget(t)
1188 assert isinstance(u, uri.CHKFileURI)
1189 if self.stats_provider:
1190 # these counters are meant for network traffic, and don't
1192 self.stats_provider.count('downloader.files_downloaded', 1)
1193 self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1195 target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1198 dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target, monitor=monitor)
1199 self._all_downloads[dl] = None
1201 history.add_download(dl.get_download_status())
1206 def download_to_data(self, uri, _log_msg_id=None, history=None):
1207 return self.download(uri, Data(), _log_msg_id=_log_msg_id, history=history)
1208 def download_to_filename(self, uri, filename, _log_msg_id=None):
1209 return self.download(uri, FileName(filename), _log_msg_id=_log_msg_id)
1210 def download_to_filehandle(self, uri, filehandle, _log_msg_id=None):
1211 return self.download(uri, FileHandle(filehandle), _log_msg_id=_log_msg_id)