From: Brian Warner Date: Mon, 5 Oct 2009 19:25:42 +0000 (-0700) Subject: immutable/download.py: wrap to 80cols, no functional changes X-Git-Tag: trac-4100~33 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/%22doc.html/%3C?a=commitdiff_plain;h=19d336513c27ab31aa8184f947aacffc4d92146e;p=tahoe-lafs%2Ftahoe-lafs.git immutable/download.py: wrap to 80cols, no functional changes --- diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index cd185bea..7c55666a 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -67,9 +67,10 @@ class DecryptingTarget(log.PrefixingLogMixin): self.target.close() def finish(self): return self.target.finish() - # The following methods is just to pass through to the next target, and just because that - # target might be a repairer.DownUpConnector, and just because the current CHKUpload object - # expects to find the storage index in its Uploadable. + # The following methods is just to pass through to the next target, and + # just because that target might be a repairer.DownUpConnector, and just + # because the current CHKUpload object expects to find the storage index + # in its Uploadable. def set_storageindex(self, storageindex): self.target.set_storageindex(storageindex) def set_encodingparams(self, encodingparams): @@ -104,7 +105,9 @@ class ValidatedThingObtainer: def _try_the_next_one(self): vtp = self._validatedthingproxies.pop(0) - d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks + # start() obtains, validates, and callsback-with the thing or else + # errbacks + d = vtp.start() d.addErrback(self._bad, vtp) return d @@ -113,10 +116,12 @@ class ValidatedThingObtainer: class ValidatedCrypttextHashTreeProxy: implements(IValidatedThingProxy) - """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use - its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a - start() method that fires with self once I get a valid one). """ - def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None): + """ I am a front-end for a remote crypttext hash tree using a local + ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the + Validated Thing protocol (i.e., I have a start() method that fires with + self once I get a valid one).""" + def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, + fetch_failures=None): # fetch_failures is for debugging -- see test_encode.py self._readbucketproxy = readbucketproxy self._num_segments = num_segments @@ -131,8 +136,10 @@ class ValidatedCrypttextHashTreeProxy: if self._fetch_failures is not None: self._fetch_failures["crypttext_hash_tree"] += 1 raise BadOrMissingHash(le) - # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done. - # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time. + # If we now have enough of the crypttext hash tree to integrity-check + # *any* segment of ciphertext, then we are done. TODO: It would have + # better alacrity if we downloaded only part of the crypttext hash + # tree at a time. for segnum in range(self._num_segments): if self._crypttext_hash_tree.needed_hashes(segnum): raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,)) @@ -145,8 +152,8 @@ class ValidatedCrypttextHashTreeProxy: class ValidatedExtendedURIProxy: implements(IValidatedThingProxy) - """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for - retrieving and validating the elements from the UEB. """ + """ I am a front-end for a remote UEB (using a local ReadBucketProxy), + responsible for retrieving and validating the elements from the UEB.""" def __init__(self, readbucketproxy, verifycap, fetch_failures=None): # fetch_failures is for debugging -- see test_encode.py @@ -177,7 +184,9 @@ class ValidatedExtendedURIProxy: h = hashutil.uri_extension_hash(data) if h != self._verifycap.uri_extension_hash: msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" % - (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h))) + (self._readbucketproxy, + base32.b2a(self._verifycap.uri_extension_hash), + base32.b2a(h))) if self._fetch_failures is not None: self._fetch_failures["uri_extension"] += 1 raise BadURIExtensionHashValue(msg) @@ -185,38 +194,46 @@ class ValidatedExtendedURIProxy: return data def _parse_and_validate(self, data): - self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares) + self.share_size = mathutil.div_ceil(self._verifycap.size, + self._verifycap.needed_shares) d = uri.unpack_extension(data) - # There are several kinds of things that can be found in a UEB. First, things that we - # really need to learn from the UEB in order to do this download. Next: things which are - # optional but not redundant -- if they are present in the UEB they will get used. Next, - # things that are optional and redundant. These things are required to be consistent: - # they don't have to be in the UEB, but if they are in the UEB then they will be checked - # for consistency with the already-known facts, and if they are inconsistent then an - # exception will be raised. These things aren't actually used -- they are just tested - # for consistency and ignored. Finally: things which are deprecated -- they ought not be - # in the UEB at all, and if they are present then a warning will be logged but they are - # otherwise ignored. - - # First, things that we really need to learn from the UEB: segment_size, - # crypttext_root_hash, and share_root_hash. + # There are several kinds of things that can be found in a UEB. + # First, things that we really need to learn from the UEB in order to + # do this download. Next: things which are optional but not redundant + # -- if they are present in the UEB they will get used. Next, things + # that are optional and redundant. These things are required to be + # consistent: they don't have to be in the UEB, but if they are in + # the UEB then they will be checked for consistency with the + # already-known facts, and if they are inconsistent then an exception + # will be raised. These things aren't actually used -- they are just + # tested for consistency and ignored. Finally: things which are + # deprecated -- they ought not be in the UEB at all, and if they are + # present then a warning will be logged but they are otherwise + # ignored. + + # First, things that we really need to learn from the UEB: + # segment_size, crypttext_root_hash, and share_root_hash. self.segment_size = d['segment_size'] - self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares) - self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size) + self.block_size = mathutil.div_ceil(self.segment_size, + self._verifycap.needed_shares) + self.num_segments = mathutil.div_ceil(self._verifycap.size, + self.segment_size) self.tail_data_size = self._verifycap.size % self.segment_size if not self.tail_data_size: self.tail_data_size = self.segment_size # padding for erasure code - self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares) - - # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that - # matches this read-cap or verify-cap. The integrity check on the shares is not - # sufficient to prevent the original encoder from creating some shares of file A and - # other shares of file B. + self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, + self._verifycap.needed_shares) + + # Ciphertext hash tree root is mandatory, so that there is at most + # one ciphertext that matches this read-cap or verify-cap. The + # integrity check on the shares is not sufficient to prevent the + # original encoder from creating some shares of file A and other + # shares of file B. self.crypttext_root_hash = d['crypttext_root_hash'] self.share_root_hash = d['share_root_hash'] @@ -229,8 +246,9 @@ class ValidatedExtendedURIProxy: raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),)) - # Next: things that are optional, redundant, and required to be consistent: codec_name, - # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares + # Next: things that are optional, redundant, and required to be + # consistent: codec_name, codec_params, tail_codec_params, + # num_segments, size, needed_shares, total_shares if d.has_key('codec_name'): if d['codec_name'] != "crs": raise UnsupportedErasureCodec(d['codec_name']) @@ -238,16 +256,17 @@ class ValidatedExtendedURIProxy: if d.has_key('codec_params'): ucpss, ucpns, ucpts = codec.parse_params(d['codec_params']) if ucpss != self.segment_size: - raise BadURIExtension("inconsistent erasure code params: ucpss: %s != " - "self.segment_size: %s" % (ucpss, self.segment_size)) + raise BadURIExtension("inconsistent erasure code params: " + "ucpss: %s != self.segment_size: %s" % + (ucpss, self.segment_size)) if ucpns != self._verifycap.needed_shares: raise BadURIExtension("inconsistent erasure code params: ucpns: %s != " - "self._verifycap.needed_shares: %s" % (ucpns, - self._verifycap.needed_shares)) + "self._verifycap.needed_shares: %s" % + (ucpns, self._verifycap.needed_shares)) if ucpts != self._verifycap.total_shares: raise BadURIExtension("inconsistent erasure code params: ucpts: %s != " - "self._verifycap.total_shares: %s" % (ucpts, - self._verifycap.total_shares)) + "self._verifycap.total_shares: %s" % + (ucpts, self._verifycap.total_shares)) if d.has_key('tail_codec_params'): utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params']) @@ -291,7 +310,8 @@ class ValidatedExtendedURIProxy: "total shares: %s" % (self._verifycap.total_shares, d['total_shares'])) - # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash + # Finally, things that are deprecated and ignored: plaintext_hash, + # plaintext_root_hash if d.get('plaintext_hash'): log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons " "and is no longer used. Ignoring. %s" % (self,)) @@ -302,27 +322,33 @@ class ValidatedExtendedURIProxy: return self def start(self): - """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse - it. Returns a deferred which is called back with self once the fetch is successful, or - is erred back if it fails. """ + """Fetch the UEB from bucket, compare its hash to the hash from + verifycap, then parse it. Returns a deferred which is called back + with self once the fetch is successful, or is erred back if it + fails.""" d = self._readbucketproxy.get_uri_extension() d.addCallback(self._check_integrity) d.addCallback(self._parse_and_validate) return d class ValidatedReadBucketProxy(log.PrefixingLogMixin): - """I am a front-end for a remote storage bucket, responsible for retrieving and validating - data from that bucket. + """I am a front-end for a remote storage bucket, responsible for + retrieving and validating data from that bucket. My get_block() method is used by BlockDownloaders. """ - def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size): - """ share_hash_tree is required to have already been initialized with the root hash - (the number-0 hash), using the share_root_hash from the UEB """ + def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, + block_size, share_size): + """ share_hash_tree is required to have already been initialized with + the root hash (the number-0 hash), using the share_root_hash from the + UEB""" precondition(share_hash_tree[0] is not None, share_hash_tree) - prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60)) - log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) + prefix = "%d-%s-%s" % (sharenum, bucket, + base32.b2a_l(share_hash_tree[0][:8], 60)) + log.PrefixingLogMixin.__init__(self, + facility="tahoe.immutable.download", + prefix=prefix) self.sharenum = sharenum self.bucket = bucket self.share_hash_tree = share_hash_tree @@ -343,7 +369,8 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin): # We might need to grab some elements of our block hash tree, to # validate the requested block up to the share hash. blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True) - # We don't need the root of the block hash tree, as that comes in the share tree. + # We don't need the root of the block hash tree, as that comes in the + # share tree. blockhashesneeded.discard(0) d2 = self.bucket.get_block_hashes(blockhashesneeded) @@ -353,14 +380,16 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin): thisblocksize = self.share_size % self.block_size if thisblocksize == 0: thisblocksize = self.block_size - d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize) + d3 = self.bucket.get_block_data(blocknum, + self.block_size, thisblocksize) dl = deferredutil.gatherResults([d1, d2, d3]) dl.addCallback(self._got_data, blocknum) return dl def _got_data(self, results, blocknum): - precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks) + precondition(blocknum < self.num_blocks, + self, blocknum, self.num_blocks) sharehashes, blockhashes, blockdata = results try: sharehashes = dict(sharehashes) @@ -374,22 +403,25 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin): try: if self.share_hash_tree.needed_hashes(self.sharenum): - # This will raise exception if the values being passed do not match the root - # node of self.share_hash_tree. + # This will raise exception if the values being passed do not + # match the root node of self.share_hash_tree. try: self.share_hash_tree.set_hashes(sharehashes) except IndexError, le: - # Weird -- sharehashes contained index numbers outside of the range that fit - # into this hash tree. + # Weird -- sharehashes contained index numbers outside of + # the range that fit into this hash tree. raise BadOrMissingHash(le) - # To validate a block we need the root of the block hash tree, which is also one of - # the leafs of the share hash tree, and is called "the share hash". + # To validate a block we need the root of the block hash tree, + # which is also one of the leafs of the share hash tree, and is + # called "the share hash". if not self.block_hash_tree[0]: # empty -- no root node yet # Get the share hash from the share hash tree. share_hash = self.share_hash_tree.get_leaf(self.sharenum) if not share_hash: - raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server. + # No root node in block_hash_tree and also the share hash + # wasn't sent by the server. + raise hashtree.NotEnoughHashesError self.block_hash_tree.set_hashes({0: share_hash}) if self.block_hash_tree.needed_hashes(blocknum): @@ -662,7 +694,8 @@ class CiphertextDownloader(log.PrefixingLogMixin): if IConsumer.providedBy(target): target.registerProducer(self, True) self._target = target - self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex. + # Repairer (uploader) needs the storageindex. + self._target.set_storageindex(self._storage_index) self._monitor = monitor self._opened = False @@ -715,7 +748,8 @@ class CiphertextDownloader(log.PrefixingLogMixin): # first step: who should we download from? d = defer.maybeDeferred(self._get_all_shareholders) d.addCallback(self._got_all_shareholders) - # now get the uri_extension block from somebody and integrity check it and parse and validate its contents + # now get the uri_extension block from somebody and integrity check + # it and parse and validate its contents d.addCallback(self._obtain_uri_extension) d.addCallback(self._get_crypttext_hash_tree) # once we know that, we can download blocks from everybody @@ -734,11 +768,13 @@ class CiphertextDownloader(log.PrefixingLogMixin): self._status.set_status("Failed") self._status.set_active(False) if why.check(DownloadStopped): - # DownloadStopped just means the consumer aborted the download; not so scary. + # DownloadStopped just means the consumer aborted the + # download; not so scary. self.log("download stopped", level=log.UNUSUAL) else: # This is really unusual, and deserves maximum forensics. - self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ") + self.log("download failed!", failure=why, level=log.SCARY, + umid="lp1vaQ") return why d.addErrback(_failed) d.addCallback(self._done) @@ -885,12 +921,13 @@ class CiphertextDownloader(log.PrefixingLogMixin): if self._status: self._status.set_status("Retrieving crypttext hash tree") - vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid) + vto = ValidatedThingObtainer(vchtps, debugname="vchtps", + log_id=self._parentmsgid) d = vto.start() def _got_crypttext_hash_tree(res): - # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated - # with hashes. + # Good -- the self._crypttext_hash_tree that we passed to vchtp + # is now populated with hashes. if self._results: elapsed = time.time() - _get_crypttext_hash_tree_started self._results.timings["hashtrees"] = elapsed @@ -898,15 +935,16 @@ class CiphertextDownloader(log.PrefixingLogMixin): return d def _activate_enough_buckets(self): - """either return a mapping from shnum to a ValidatedReadBucketProxy that can - provide data for that share, or raise NotEnoughSharesError""" + """either return a mapping from shnum to a ValidatedReadBucketProxy + that can provide data for that share, or raise NotEnoughSharesError""" while len(self.active_buckets) < self._verifycap.needed_shares: # need some more handled_shnums = set(self.active_buckets.keys()) available_shnums = set(self._share_vbuckets.keys()) potential_shnums = list(available_shnums - handled_shnums) - if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)): + if len(potential_shnums) < (self._verifycap.needed_shares + - len(self.active_buckets)): have = len(potential_shnums) + len(self.active_buckets) msg = "Unable to activate enough shares: have %d, need %d" \ % (have, self._verifycap.needed_shares) @@ -914,8 +952,8 @@ class CiphertextDownloader(log.PrefixingLogMixin): raise NotEnoughSharesError(msg) else: raise NoSharesError(msg) - # For the next share, choose a primary share if available, else a randomly chosen - # secondary share. + # For the next share, choose a primary share if available, else a + # randomly chosen secondary share. potential_shnums.sort() if potential_shnums[0] < self._verifycap.needed_shares: shnum = potential_shnums[0] @@ -969,7 +1007,8 @@ class CiphertextDownloader(log.PrefixingLogMixin): 100.0 * segnum / self._vup.num_segments)) # memory footprint: when the SegmentDownloader finishes pulling down # all shares, we have 1*segment_size of usage. - segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares, + segmentdler = SegmentDownloader(self, segnum, + self._verifycap.needed_shares, self._results) started = time.time() d = segmentdler.start() @@ -1014,8 +1053,9 @@ class CiphertextDownloader(log.PrefixingLogMixin): if self._current_segnum + 1 == self._vup.num_segments: # This is the last segment. - # Trim off any padding added by the upload side. We never send empty segments. If - # the data was an exact multiple of the segment size, the last segment will be full. + # Trim off any padding added by the upload side. We never send + # empty segments. If the data was an exact multiple of the + # segment size, the last segment will be full. tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares) num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size) # Remove buffers which don't contain any part of the tail. @@ -1089,9 +1129,10 @@ class FileName: pass # we won't use it def finish(self): pass - # The following methods are just because the target might be a repairer.DownUpConnector, - # and just because the current CHKUpload object expects to find the storage index and - # encoding parameters in its Uploadable. + # The following methods are just because the target might be a + # repairer.DownUpConnector, and just because the current CHKUpload object + # expects to find the storage index and encoding parameters in its + # Uploadable. def set_storageindex(self, storageindex): pass def set_encodingparams(self, encodingparams): @@ -1114,9 +1155,10 @@ class Data: pass # we won't use it def finish(self): return self.data - # The following methods are just because the target might be a repairer.DownUpConnector, - # and just because the current CHKUpload object expects to find the storage index and - # encoding parameters in its Uploadable. + # The following methods are just because the target might be a + # repairer.DownUpConnector, and just because the current CHKUpload object + # expects to find the storage index and encoding parameters in its + # Uploadable. def set_storageindex(self, storageindex): pass def set_encodingparams(self, encodingparams): @@ -1144,9 +1186,10 @@ class FileHandle: pass def finish(self): return self._filehandle - # The following methods are just because the target might be a repairer.DownUpConnector, - # and just because the current CHKUpload object expects to find the storage index and - # encoding parameters in its Uploadable. + # The following methods are just because the target might be a + # repairer.DownUpConnector, and just because the current CHKUpload object + # expects to find the storage index and encoding parameters in its + # Uploadable. def set_storageindex(self, storageindex): pass def set_encodingparams(self, encodingparams): @@ -1175,9 +1218,10 @@ class ConsumerAdapter: pass def finish(self): return self._consumer - # The following methods are just because the target might be a repairer.DownUpConnector, - # and just because the current CHKUpload object expects to find the storage index and - # encoding parameters in its Uploadable. + # The following methods are just because the target might be a + # repairer.DownUpConnector, and just because the current CHKUpload object + # expects to find the storage index and encoding parameters in its + # Uploadable. def set_storageindex(self, storageindex): pass def set_encodingparams(self, encodingparams):