]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
immutable/download.py: wrap to 80cols, no functional changes
authorBrian Warner <warner@lothar.com>
Mon, 5 Oct 2009 19:25:42 +0000 (12:25 -0700)
committerBrian Warner <warner@lothar.com>
Mon, 5 Oct 2009 19:25:42 +0000 (12:25 -0700)
src/allmydata/immutable/download.py

index cd185bea1da59dce6e38d312e9db34ccd732ec67..7c55666ac8fee83d1cc3872569b259851c8874c0 100644 (file)
@@ -67,9 +67,10 @@ class DecryptingTarget(log.PrefixingLogMixin):
         self.target.close()
     def finish(self):
         return self.target.finish()
-    # The following methods is just to pass through to the next target, and just because that
-    # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
-    # expects to find the storage index in its Uploadable.
+    # The following methods is just to pass through to the next target, and
+    # just because that target might be a repairer.DownUpConnector, and just
+    # because the current CHKUpload object expects to find the storage index
+    # in its Uploadable.
     def set_storageindex(self, storageindex):
         self.target.set_storageindex(storageindex)
     def set_encodingparams(self, encodingparams):
@@ -104,7 +105,9 @@ class ValidatedThingObtainer:
 
     def _try_the_next_one(self):
         vtp = self._validatedthingproxies.pop(0)
-        d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
+        # start() obtains, validates, and callsback-with the thing or else
+        # errbacks
+        d = vtp.start()
         d.addErrback(self._bad, vtp)
         return d
 
@@ -113,10 +116,12 @@ class ValidatedThingObtainer:
 
 class ValidatedCrypttextHashTreeProxy:
     implements(IValidatedThingProxy)
-    """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
-    its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
-    start() method that fires with self once I get a valid one). """
-    def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
+    """ I am a front-end for a remote crypttext hash tree using a local
+    ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the
+    Validated Thing protocol (i.e., I have a start() method that fires with
+    self once I get a valid one)."""
+    def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments,
+                 fetch_failures=None):
         # fetch_failures is for debugging -- see test_encode.py
         self._readbucketproxy = readbucketproxy
         self._num_segments = num_segments
@@ -131,8 +136,10 @@ class ValidatedCrypttextHashTreeProxy:
             if self._fetch_failures is not None:
                 self._fetch_failures["crypttext_hash_tree"] += 1
             raise BadOrMissingHash(le)
-        # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
-        # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
+        # If we now have enough of the crypttext hash tree to integrity-check
+        # *any* segment of ciphertext, then we are done. TODO: It would have
+        # better alacrity if we downloaded only part of the crypttext hash
+        # tree at a time.
         for segnum in range(self._num_segments):
             if self._crypttext_hash_tree.needed_hashes(segnum):
                 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
@@ -145,8 +152,8 @@ class ValidatedCrypttextHashTreeProxy:
 
 class ValidatedExtendedURIProxy:
     implements(IValidatedThingProxy)
-    """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
-    retrieving and validating the elements from the UEB. """
+    """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
+    responsible for retrieving and validating the elements from the UEB."""
 
     def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
         # fetch_failures is for debugging -- see test_encode.py
@@ -177,7 +184,9 @@ class ValidatedExtendedURIProxy:
         h = hashutil.uri_extension_hash(data)
         if h != self._verifycap.uri_extension_hash:
             msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
-                   (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
+                   (self._readbucketproxy,
+                    base32.b2a(self._verifycap.uri_extension_hash),
+                    base32.b2a(h)))
             if self._fetch_failures is not None:
                 self._fetch_failures["uri_extension"] += 1
             raise BadURIExtensionHashValue(msg)
@@ -185,38 +194,46 @@ class ValidatedExtendedURIProxy:
             return data
 
     def _parse_and_validate(self, data):
-        self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
+        self.share_size = mathutil.div_ceil(self._verifycap.size,
+                                            self._verifycap.needed_shares)
 
         d = uri.unpack_extension(data)
 
-        # There are several kinds of things that can be found in a UEB.  First, things that we
-        # really need to learn from the UEB in order to do this download. Next: things which are
-        # optional but not redundant -- if they are present in the UEB they will get used. Next,
-        # things that are optional and redundant. These things are required to be consistent:
-        # they don't have to be in the UEB, but if they are in the UEB then they will be checked
-        # for consistency with the already-known facts, and if they are inconsistent then an
-        # exception will be raised. These things aren't actually used -- they are just tested
-        # for consistency and ignored. Finally: things which are deprecated -- they ought not be
-        # in the UEB at all, and if they are present then a warning will be logged but they are
-        # otherwise ignored.
-
-       # First, things that we really need to learn from the UEB: segment_size,
-        # crypttext_root_hash, and share_root_hash.
+        # There are several kinds of things that can be found in a UEB.
+        # First, things that we really need to learn from the UEB in order to
+        # do this download. Next: things which are optional but not redundant
+        # -- if they are present in the UEB they will get used. Next, things
+        # that are optional and redundant. These things are required to be
+        # consistent: they don't have to be in the UEB, but if they are in
+        # the UEB then they will be checked for consistency with the
+        # already-known facts, and if they are inconsistent then an exception
+        # will be raised. These things aren't actually used -- they are just
+        # tested for consistency and ignored. Finally: things which are
+        # deprecated -- they ought not be in the UEB at all, and if they are
+        # present then a warning will be logged but they are otherwise
+        # ignored.
+
+        # First, things that we really need to learn from the UEB:
+        # segment_size, crypttext_root_hash, and share_root_hash.
         self.segment_size = d['segment_size']
 
-        self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
-        self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
+        self.block_size = mathutil.div_ceil(self.segment_size,
+                                            self._verifycap.needed_shares)
+        self.num_segments = mathutil.div_ceil(self._verifycap.size,
+                                              self.segment_size)
 
         self.tail_data_size = self._verifycap.size % self.segment_size
         if not self.tail_data_size:
             self.tail_data_size = self.segment_size
         # padding for erasure code
-        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
-
-        # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
-        # matches this read-cap or verify-cap.  The integrity check on the shares is not
-        # sufficient to prevent the original encoder from creating some shares of file A and
-        # other shares of file B.
+        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
+                                                        self._verifycap.needed_shares)
+
+        # Ciphertext hash tree root is mandatory, so that there is at most
+        # one ciphertext that matches this read-cap or verify-cap. The
+        # integrity check on the shares is not sufficient to prevent the
+        # original encoder from creating some shares of file A and other
+        # shares of file B.
         self.crypttext_root_hash = d['crypttext_root_hash']
 
         self.share_root_hash = d['share_root_hash']
@@ -229,8 +246,9 @@ class ValidatedExtendedURIProxy:
                 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
 
 
-        # Next: things that are optional, redundant, and required to be consistent: codec_name,
-        # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
+        # Next: things that are optional, redundant, and required to be
+        # consistent: codec_name, codec_params, tail_codec_params,
+        # num_segments, size, needed_shares, total_shares
         if d.has_key('codec_name'):
             if d['codec_name'] != "crs":
                 raise UnsupportedErasureCodec(d['codec_name'])
@@ -238,16 +256,17 @@ class ValidatedExtendedURIProxy:
         if d.has_key('codec_params'):
             ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
             if ucpss != self.segment_size:
-                raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
-                                      "self.segment_size: %s" % (ucpss, self.segment_size))
+                raise BadURIExtension("inconsistent erasure code params: "
+                                      "ucpss: %s != self.segment_size: %s" %
+                                      (ucpss, self.segment_size))
             if ucpns != self._verifycap.needed_shares:
                 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
-                                      "self._verifycap.needed_shares: %s" % (ucpns,
-                                                                             self._verifycap.needed_shares))
+                                      "self._verifycap.needed_shares: %s" %
+                                      (ucpns, self._verifycap.needed_shares))
             if ucpts != self._verifycap.total_shares:
                 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
-                                      "self._verifycap.total_shares: %s" % (ucpts,
-                                                                            self._verifycap.total_shares))
+                                      "self._verifycap.total_shares: %s" %
+                                      (ucpts, self._verifycap.total_shares))
 
         if d.has_key('tail_codec_params'):
             utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
@@ -291,7 +310,8 @@ class ValidatedExtendedURIProxy:
                                       "total shares: %s" % (self._verifycap.total_shares,
                                                             d['total_shares']))
 
-        # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
+        # Finally, things that are deprecated and ignored: plaintext_hash,
+        # plaintext_root_hash
         if d.get('plaintext_hash'):
             log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
                     "and is no longer used.  Ignoring.  %s" % (self,))
@@ -302,27 +322,33 @@ class ValidatedExtendedURIProxy:
         return self
 
     def start(self):
-        """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
-        it.  Returns a deferred which is called back with self once the fetch is successful, or
-        is erred back if it fails. """
+        """Fetch the UEB from bucket, compare its hash to the hash from
+        verifycap, then parse it. Returns a deferred which is called back
+        with self once the fetch is successful, or is erred back if it
+        fails."""
         d = self._readbucketproxy.get_uri_extension()
         d.addCallback(self._check_integrity)
         d.addCallback(self._parse_and_validate)
         return d
 
 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
-    """I am a front-end for a remote storage bucket, responsible for retrieving and validating
-    data from that bucket.
+    """I am a front-end for a remote storage bucket, responsible for
+    retrieving and validating data from that bucket.
 
     My get_block() method is used by BlockDownloaders.
     """
 
-    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
-        """ share_hash_tree is required to have already been initialized with the root hash
-        (the number-0 hash), using the share_root_hash from the UEB """
+    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
+                 block_size, share_size):
+        """ share_hash_tree is required to have already been initialized with
+        the root hash (the number-0 hash), using the share_root_hash from the
+        UEB"""
         precondition(share_hash_tree[0] is not None, share_hash_tree)
-        prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
-        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
+        prefix = "%d-%s-%s" % (sharenum, bucket,
+                               base32.b2a_l(share_hash_tree[0][:8], 60))
+        log.PrefixingLogMixin.__init__(self,
+                                       facility="tahoe.immutable.download",
+                                       prefix=prefix)
         self.sharenum = sharenum
         self.bucket = bucket
         self.share_hash_tree = share_hash_tree
@@ -343,7 +369,8 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
         # We might need to grab some elements of our block hash tree, to
         # validate the requested block up to the share hash.
         blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
-        # We don't need the root of the block hash tree, as that comes in the share tree.
+        # We don't need the root of the block hash tree, as that comes in the
+        # share tree.
         blockhashesneeded.discard(0)
         d2 = self.bucket.get_block_hashes(blockhashesneeded)
 
@@ -353,14 +380,16 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
             thisblocksize = self.share_size % self.block_size
             if thisblocksize == 0:
                 thisblocksize = self.block_size
-        d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
+        d3 = self.bucket.get_block_data(blocknum,
+                                        self.block_size, thisblocksize)
 
         dl = deferredutil.gatherResults([d1, d2, d3])
         dl.addCallback(self._got_data, blocknum)
         return dl
 
     def _got_data(self, results, blocknum):
-        precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
+        precondition(blocknum < self.num_blocks,
+                     self, blocknum, self.num_blocks)
         sharehashes, blockhashes, blockdata = results
         try:
             sharehashes = dict(sharehashes)
@@ -374,22 +403,25 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
 
         try:
             if self.share_hash_tree.needed_hashes(self.sharenum):
-                # This will raise exception if the values being passed do not match the root
-                # node of self.share_hash_tree.
+                # This will raise exception if the values being passed do not
+                # match the root node of self.share_hash_tree.
                 try:
                     self.share_hash_tree.set_hashes(sharehashes)
                 except IndexError, le:
-                    # Weird -- sharehashes contained index numbers outside of the range that fit
-                    # into this hash tree.
+                    # Weird -- sharehashes contained index numbers outside of
+                    # the range that fit into this hash tree.
                     raise BadOrMissingHash(le)
 
-            # To validate a block we need the root of the block hash tree, which is also one of
-            # the leafs of the share hash tree, and is called "the share hash".
+            # To validate a block we need the root of the block hash tree,
+            # which is also one of the leafs of the share hash tree, and is
+            # called "the share hash".
             if not self.block_hash_tree[0]: # empty -- no root node yet
                 # Get the share hash from the share hash tree.
                 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
                 if not share_hash:
-                    raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
+                    # No root node in block_hash_tree and also the share hash
+                    # wasn't sent by the server.
+                    raise hashtree.NotEnoughHashesError
                 self.block_hash_tree.set_hashes({0: share_hash})
 
             if self.block_hash_tree.needed_hashes(blocknum):
@@ -662,7 +694,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         if IConsumer.providedBy(target):
             target.registerProducer(self, True)
         self._target = target
-        self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
+        # Repairer (uploader) needs the storageindex.
+        self._target.set_storageindex(self._storage_index)
         self._monitor = monitor
         self._opened = False
 
@@ -715,7 +748,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         # first step: who should we download from?
         d = defer.maybeDeferred(self._get_all_shareholders)
         d.addCallback(self._got_all_shareholders)
-        # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
+        # now get the uri_extension block from somebody and integrity check
+        # it and parse and validate its contents
         d.addCallback(self._obtain_uri_extension)
         d.addCallback(self._get_crypttext_hash_tree)
         # once we know that, we can download blocks from everybody
@@ -734,11 +768,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
                 self._status.set_status("Failed")
                 self._status.set_active(False)
             if why.check(DownloadStopped):
-                # DownloadStopped just means the consumer aborted the download; not so scary.
+                # DownloadStopped just means the consumer aborted the
+                # download; not so scary.
                 self.log("download stopped", level=log.UNUSUAL)
             else:
                 # This is really unusual, and deserves maximum forensics.
-                self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
+                self.log("download failed!", failure=why, level=log.SCARY,
+                         umid="lp1vaQ")
             return why
         d.addErrback(_failed)
         d.addCallback(self._done)
@@ -885,12 +921,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         if self._status:
             self._status.set_status("Retrieving crypttext hash tree")
 
-        vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
+        vto = ValidatedThingObtainer(vchtps, debugname="vchtps",
+                                     log_id=self._parentmsgid)
         d = vto.start()
 
         def _got_crypttext_hash_tree(res):
-            # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
-            # with hashes.
+            # Good -- the self._crypttext_hash_tree that we passed to vchtp
+            # is now populated with hashes.
             if self._results:
                 elapsed = time.time() - _get_crypttext_hash_tree_started
                 self._results.timings["hashtrees"] = elapsed
@@ -898,15 +935,16 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         return d
 
     def _activate_enough_buckets(self):
-        """either return a mapping from shnum to a ValidatedReadBucketProxy that can
-        provide data for that share, or raise NotEnoughSharesError"""
+        """either return a mapping from shnum to a ValidatedReadBucketProxy
+        that can provide data for that share, or raise NotEnoughSharesError"""
 
         while len(self.active_buckets) < self._verifycap.needed_shares:
             # need some more
             handled_shnums = set(self.active_buckets.keys())
             available_shnums = set(self._share_vbuckets.keys())
             potential_shnums = list(available_shnums - handled_shnums)
-            if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
+            if len(potential_shnums) < (self._verifycap.needed_shares
+                                        - len(self.active_buckets)):
                 have = len(potential_shnums) + len(self.active_buckets)
                 msg = "Unable to activate enough shares: have %d, need %d" \
                       % (have, self._verifycap.needed_shares)
@@ -914,8 +952,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
                     raise NotEnoughSharesError(msg)
                 else:
                     raise NoSharesError(msg)
-            # For the next share, choose a primary share if available, else a randomly chosen
-            # secondary share.
+            # For the next share, choose a primary share if available, else a
+            # randomly chosen secondary share.
             potential_shnums.sort()
             if potential_shnums[0] < self._verifycap.needed_shares:
                 shnum = potential_shnums[0]
@@ -969,7 +1007,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
                     100.0 * segnum / self._vup.num_segments))
         # memory footprint: when the SegmentDownloader finishes pulling down
         # all shares, we have 1*segment_size of usage.
-        segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
+        segmentdler = SegmentDownloader(self, segnum,
+                                        self._verifycap.needed_shares,
                                         self._results)
         started = time.time()
         d = segmentdler.start()
@@ -1014,8 +1053,9 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
         if self._current_segnum + 1 == self._vup.num_segments:
             # This is the last segment.
-            # Trim off any padding added by the upload side.  We never send empty segments. If
-            # the data was an exact multiple of the segment size, the last segment will be full.
+            # Trim off any padding added by the upload side. We never send
+            # empty segments. If the data was an exact multiple of the
+            # segment size, the last segment will be full.
             tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
             num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
             # Remove buffers which don't contain any part of the tail.
@@ -1089,9 +1129,10 @@ class FileName:
         pass # we won't use it
     def finish(self):
         pass
-    # The following methods are just because the target might be a repairer.DownUpConnector,
-    # and just because the current CHKUpload object expects to find the storage index and
-    # encoding parameters in its Uploadable.
+    # The following methods are just because the target might be a
+    # repairer.DownUpConnector, and just because the current CHKUpload object
+    # expects to find the storage index and encoding parameters in its
+    # Uploadable.
     def set_storageindex(self, storageindex):
         pass
     def set_encodingparams(self, encodingparams):
@@ -1114,9 +1155,10 @@ class Data:
         pass # we won't use it
     def finish(self):
         return self.data
-    # The following methods are just because the target might be a repairer.DownUpConnector,
-    # and just because the current CHKUpload object expects to find the storage index and
-    # encoding parameters in its Uploadable.
+    # The following methods are just because the target might be a
+    # repairer.DownUpConnector, and just because the current CHKUpload object
+    # expects to find the storage index and encoding parameters in its
+    # Uploadable.
     def set_storageindex(self, storageindex):
         pass
     def set_encodingparams(self, encodingparams):
@@ -1144,9 +1186,10 @@ class FileHandle:
         pass
     def finish(self):
         return self._filehandle
-    # The following methods are just because the target might be a repairer.DownUpConnector,
-    # and just because the current CHKUpload object expects to find the storage index and
-    # encoding parameters in its Uploadable.
+    # The following methods are just because the target might be a
+    # repairer.DownUpConnector, and just because the current CHKUpload object
+    # expects to find the storage index and encoding parameters in its
+    # Uploadable.
     def set_storageindex(self, storageindex):
         pass
     def set_encodingparams(self, encodingparams):
@@ -1175,9 +1218,10 @@ class ConsumerAdapter:
         pass
     def finish(self):
         return self._consumer
-    # The following methods are just because the target might be a repairer.DownUpConnector,
-    # and just because the current CHKUpload object expects to find the storage index and
-    # encoding parameters in its Uploadable.
+    # The following methods are just because the target might be a
+    # repairer.DownUpConnector, and just because the current CHKUpload object
+    # expects to find the storage index and encoding parameters in its
+    # Uploadable.
     def set_storageindex(self, storageindex):
         pass
     def set_encodingparams(self, encodingparams):