self.target.close()
def finish(self):
return self.target.finish()
- # The following methods is just to pass through to the next target, and just because that
- # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
- # expects to find the storage index in its Uploadable.
+ # The following methods is just to pass through to the next target, and
+ # just because that target might be a repairer.DownUpConnector, and just
+ # because the current CHKUpload object expects to find the storage index
+ # in its Uploadable.
def set_storageindex(self, storageindex):
self.target.set_storageindex(storageindex)
def set_encodingparams(self, encodingparams):
def _try_the_next_one(self):
vtp = self._validatedthingproxies.pop(0)
- d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
+ # start() obtains, validates, and callsback-with the thing or else
+ # errbacks
+ d = vtp.start()
d.addErrback(self._bad, vtp)
return d
class ValidatedCrypttextHashTreeProxy:
implements(IValidatedThingProxy)
- """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
- its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
- start() method that fires with self once I get a valid one). """
- def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
+ """ I am a front-end for a remote crypttext hash tree using a local
+ ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the
+ Validated Thing protocol (i.e., I have a start() method that fires with
+ self once I get a valid one)."""
+ def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments,
+ fetch_failures=None):
# fetch_failures is for debugging -- see test_encode.py
self._readbucketproxy = readbucketproxy
self._num_segments = num_segments
if self._fetch_failures is not None:
self._fetch_failures["crypttext_hash_tree"] += 1
raise BadOrMissingHash(le)
- # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
- # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
+ # If we now have enough of the crypttext hash tree to integrity-check
+ # *any* segment of ciphertext, then we are done. TODO: It would have
+ # better alacrity if we downloaded only part of the crypttext hash
+ # tree at a time.
for segnum in range(self._num_segments):
if self._crypttext_hash_tree.needed_hashes(segnum):
raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
class ValidatedExtendedURIProxy:
implements(IValidatedThingProxy)
- """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
- retrieving and validating the elements from the UEB. """
+ """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
+ responsible for retrieving and validating the elements from the UEB."""
def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
# fetch_failures is for debugging -- see test_encode.py
h = hashutil.uri_extension_hash(data)
if h != self._verifycap.uri_extension_hash:
msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
- (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
+ (self._readbucketproxy,
+ base32.b2a(self._verifycap.uri_extension_hash),
+ base32.b2a(h)))
if self._fetch_failures is not None:
self._fetch_failures["uri_extension"] += 1
raise BadURIExtensionHashValue(msg)
return data
def _parse_and_validate(self, data):
- self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
+ self.share_size = mathutil.div_ceil(self._verifycap.size,
+ self._verifycap.needed_shares)
d = uri.unpack_extension(data)
- # There are several kinds of things that can be found in a UEB. First, things that we
- # really need to learn from the UEB in order to do this download. Next: things which are
- # optional but not redundant -- if they are present in the UEB they will get used. Next,
- # things that are optional and redundant. These things are required to be consistent:
- # they don't have to be in the UEB, but if they are in the UEB then they will be checked
- # for consistency with the already-known facts, and if they are inconsistent then an
- # exception will be raised. These things aren't actually used -- they are just tested
- # for consistency and ignored. Finally: things which are deprecated -- they ought not be
- # in the UEB at all, and if they are present then a warning will be logged but they are
- # otherwise ignored.
-
- # First, things that we really need to learn from the UEB: segment_size,
- # crypttext_root_hash, and share_root_hash.
+ # There are several kinds of things that can be found in a UEB.
+ # First, things that we really need to learn from the UEB in order to
+ # do this download. Next: things which are optional but not redundant
+ # -- if they are present in the UEB they will get used. Next, things
+ # that are optional and redundant. These things are required to be
+ # consistent: they don't have to be in the UEB, but if they are in
+ # the UEB then they will be checked for consistency with the
+ # already-known facts, and if they are inconsistent then an exception
+ # will be raised. These things aren't actually used -- they are just
+ # tested for consistency and ignored. Finally: things which are
+ # deprecated -- they ought not be in the UEB at all, and if they are
+ # present then a warning will be logged but they are otherwise
+ # ignored.
+
+ # First, things that we really need to learn from the UEB:
+ # segment_size, crypttext_root_hash, and share_root_hash.
self.segment_size = d['segment_size']
- self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
- self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
+ self.block_size = mathutil.div_ceil(self.segment_size,
+ self._verifycap.needed_shares)
+ self.num_segments = mathutil.div_ceil(self._verifycap.size,
+ self.segment_size)
self.tail_data_size = self._verifycap.size % self.segment_size
if not self.tail_data_size:
self.tail_data_size = self.segment_size
# padding for erasure code
- self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
-
- # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
- # matches this read-cap or verify-cap. The integrity check on the shares is not
- # sufficient to prevent the original encoder from creating some shares of file A and
- # other shares of file B.
+ self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
+ self._verifycap.needed_shares)
+
+ # Ciphertext hash tree root is mandatory, so that there is at most
+ # one ciphertext that matches this read-cap or verify-cap. The
+ # integrity check on the shares is not sufficient to prevent the
+ # original encoder from creating some shares of file A and other
+ # shares of file B.
self.crypttext_root_hash = d['crypttext_root_hash']
self.share_root_hash = d['share_root_hash']
raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
- # Next: things that are optional, redundant, and required to be consistent: codec_name,
- # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
+ # Next: things that are optional, redundant, and required to be
+ # consistent: codec_name, codec_params, tail_codec_params,
+ # num_segments, size, needed_shares, total_shares
if d.has_key('codec_name'):
if d['codec_name'] != "crs":
raise UnsupportedErasureCodec(d['codec_name'])
if d.has_key('codec_params'):
ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
if ucpss != self.segment_size:
- raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
- "self.segment_size: %s" % (ucpss, self.segment_size))
+ raise BadURIExtension("inconsistent erasure code params: "
+ "ucpss: %s != self.segment_size: %s" %
+ (ucpss, self.segment_size))
if ucpns != self._verifycap.needed_shares:
raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
- "self._verifycap.needed_shares: %s" % (ucpns,
- self._verifycap.needed_shares))
+ "self._verifycap.needed_shares: %s" %
+ (ucpns, self._verifycap.needed_shares))
if ucpts != self._verifycap.total_shares:
raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
- "self._verifycap.total_shares: %s" % (ucpts,
- self._verifycap.total_shares))
+ "self._verifycap.total_shares: %s" %
+ (ucpts, self._verifycap.total_shares))
if d.has_key('tail_codec_params'):
utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
"total shares: %s" % (self._verifycap.total_shares,
d['total_shares']))
- # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
+ # Finally, things that are deprecated and ignored: plaintext_hash,
+ # plaintext_root_hash
if d.get('plaintext_hash'):
log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
"and is no longer used. Ignoring. %s" % (self,))
return self
def start(self):
- """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
- it. Returns a deferred which is called back with self once the fetch is successful, or
- is erred back if it fails. """
+ """Fetch the UEB from bucket, compare its hash to the hash from
+ verifycap, then parse it. Returns a deferred which is called back
+ with self once the fetch is successful, or is erred back if it
+ fails."""
d = self._readbucketproxy.get_uri_extension()
d.addCallback(self._check_integrity)
d.addCallback(self._parse_and_validate)
return d
class ValidatedReadBucketProxy(log.PrefixingLogMixin):
- """I am a front-end for a remote storage bucket, responsible for retrieving and validating
- data from that bucket.
+ """I am a front-end for a remote storage bucket, responsible for
+ retrieving and validating data from that bucket.
My get_block() method is used by BlockDownloaders.
"""
- def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
- """ share_hash_tree is required to have already been initialized with the root hash
- (the number-0 hash), using the share_root_hash from the UEB """
+ def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
+ block_size, share_size):
+ """ share_hash_tree is required to have already been initialized with
+ the root hash (the number-0 hash), using the share_root_hash from the
+ UEB"""
precondition(share_hash_tree[0] is not None, share_hash_tree)
- prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
- log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
+ prefix = "%d-%s-%s" % (sharenum, bucket,
+ base32.b2a_l(share_hash_tree[0][:8], 60))
+ log.PrefixingLogMixin.__init__(self,
+ facility="tahoe.immutable.download",
+ prefix=prefix)
self.sharenum = sharenum
self.bucket = bucket
self.share_hash_tree = share_hash_tree
# We might need to grab some elements of our block hash tree, to
# validate the requested block up to the share hash.
blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
- # We don't need the root of the block hash tree, as that comes in the share tree.
+ # We don't need the root of the block hash tree, as that comes in the
+ # share tree.
blockhashesneeded.discard(0)
d2 = self.bucket.get_block_hashes(blockhashesneeded)
thisblocksize = self.share_size % self.block_size
if thisblocksize == 0:
thisblocksize = self.block_size
- d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
+ d3 = self.bucket.get_block_data(blocknum,
+ self.block_size, thisblocksize)
dl = deferredutil.gatherResults([d1, d2, d3])
dl.addCallback(self._got_data, blocknum)
return dl
def _got_data(self, results, blocknum):
- precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
+ precondition(blocknum < self.num_blocks,
+ self, blocknum, self.num_blocks)
sharehashes, blockhashes, blockdata = results
try:
sharehashes = dict(sharehashes)
try:
if self.share_hash_tree.needed_hashes(self.sharenum):
- # This will raise exception if the values being passed do not match the root
- # node of self.share_hash_tree.
+ # This will raise exception if the values being passed do not
+ # match the root node of self.share_hash_tree.
try:
self.share_hash_tree.set_hashes(sharehashes)
except IndexError, le:
- # Weird -- sharehashes contained index numbers outside of the range that fit
- # into this hash tree.
+ # Weird -- sharehashes contained index numbers outside of
+ # the range that fit into this hash tree.
raise BadOrMissingHash(le)
- # To validate a block we need the root of the block hash tree, which is also one of
- # the leafs of the share hash tree, and is called "the share hash".
+ # To validate a block we need the root of the block hash tree,
+ # which is also one of the leafs of the share hash tree, and is
+ # called "the share hash".
if not self.block_hash_tree[0]: # empty -- no root node yet
# Get the share hash from the share hash tree.
share_hash = self.share_hash_tree.get_leaf(self.sharenum)
if not share_hash:
- raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
+ # No root node in block_hash_tree and also the share hash
+ # wasn't sent by the server.
+ raise hashtree.NotEnoughHashesError
self.block_hash_tree.set_hashes({0: share_hash})
if self.block_hash_tree.needed_hashes(blocknum):
if IConsumer.providedBy(target):
target.registerProducer(self, True)
self._target = target
- self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
+ # Repairer (uploader) needs the storageindex.
+ self._target.set_storageindex(self._storage_index)
self._monitor = monitor
self._opened = False
# first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders)
- # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
+ # now get the uri_extension block from somebody and integrity check
+ # it and parse and validate its contents
d.addCallback(self._obtain_uri_extension)
d.addCallback(self._get_crypttext_hash_tree)
# once we know that, we can download blocks from everybody
self._status.set_status("Failed")
self._status.set_active(False)
if why.check(DownloadStopped):
- # DownloadStopped just means the consumer aborted the download; not so scary.
+ # DownloadStopped just means the consumer aborted the
+ # download; not so scary.
self.log("download stopped", level=log.UNUSUAL)
else:
# This is really unusual, and deserves maximum forensics.
- self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
+ self.log("download failed!", failure=why, level=log.SCARY,
+ umid="lp1vaQ")
return why
d.addErrback(_failed)
d.addCallback(self._done)
if self._status:
self._status.set_status("Retrieving crypttext hash tree")
- vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
+ vto = ValidatedThingObtainer(vchtps, debugname="vchtps",
+ log_id=self._parentmsgid)
d = vto.start()
def _got_crypttext_hash_tree(res):
- # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
- # with hashes.
+ # Good -- the self._crypttext_hash_tree that we passed to vchtp
+ # is now populated with hashes.
if self._results:
elapsed = time.time() - _get_crypttext_hash_tree_started
self._results.timings["hashtrees"] = elapsed
return d
def _activate_enough_buckets(self):
- """either return a mapping from shnum to a ValidatedReadBucketProxy that can
- provide data for that share, or raise NotEnoughSharesError"""
+ """either return a mapping from shnum to a ValidatedReadBucketProxy
+ that can provide data for that share, or raise NotEnoughSharesError"""
while len(self.active_buckets) < self._verifycap.needed_shares:
# need some more
handled_shnums = set(self.active_buckets.keys())
available_shnums = set(self._share_vbuckets.keys())
potential_shnums = list(available_shnums - handled_shnums)
- if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
+ if len(potential_shnums) < (self._verifycap.needed_shares
+ - len(self.active_buckets)):
have = len(potential_shnums) + len(self.active_buckets)
msg = "Unable to activate enough shares: have %d, need %d" \
% (have, self._verifycap.needed_shares)
raise NotEnoughSharesError(msg)
else:
raise NoSharesError(msg)
- # For the next share, choose a primary share if available, else a randomly chosen
- # secondary share.
+ # For the next share, choose a primary share if available, else a
+ # randomly chosen secondary share.
potential_shnums.sort()
if potential_shnums[0] < self._verifycap.needed_shares:
shnum = potential_shnums[0]
100.0 * segnum / self._vup.num_segments))
# memory footprint: when the SegmentDownloader finishes pulling down
# all shares, we have 1*segment_size of usage.
- segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
+ segmentdler = SegmentDownloader(self, segnum,
+ self._verifycap.needed_shares,
self._results)
started = time.time()
d = segmentdler.start()
if self._current_segnum + 1 == self._vup.num_segments:
# This is the last segment.
- # Trim off any padding added by the upload side. We never send empty segments. If
- # the data was an exact multiple of the segment size, the last segment will be full.
+ # Trim off any padding added by the upload side. We never send
+ # empty segments. If the data was an exact multiple of the
+ # segment size, the last segment will be full.
tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
# Remove buffers which don't contain any part of the tail.
pass # we won't use it
def finish(self):
pass
- # The following methods are just because the target might be a repairer.DownUpConnector,
- # and just because the current CHKUpload object expects to find the storage index and
- # encoding parameters in its Uploadable.
+ # The following methods are just because the target might be a
+ # repairer.DownUpConnector, and just because the current CHKUpload object
+ # expects to find the storage index and encoding parameters in its
+ # Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
pass # we won't use it
def finish(self):
return self.data
- # The following methods are just because the target might be a repairer.DownUpConnector,
- # and just because the current CHKUpload object expects to find the storage index and
- # encoding parameters in its Uploadable.
+ # The following methods are just because the target might be a
+ # repairer.DownUpConnector, and just because the current CHKUpload object
+ # expects to find the storage index and encoding parameters in its
+ # Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
pass
def finish(self):
return self._filehandle
- # The following methods are just because the target might be a repairer.DownUpConnector,
- # and just because the current CHKUpload object expects to find the storage index and
- # encoding parameters in its Uploadable.
+ # The following methods are just because the target might be a
+ # repairer.DownUpConnector, and just because the current CHKUpload object
+ # expects to find the storage index and encoding parameters in its
+ # Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
pass
def finish(self):
return self._consumer
- # The following methods are just because the target might be a repairer.DownUpConnector,
- # and just because the current CHKUpload object expects to find the storage index and
- # encoding parameters in its Uploadable.
+ # The following methods are just because the target might be a
+ # repairer.DownUpConnector, and just because the current CHKUpload object
+ # expects to find the storage index and encoding parameters in its
+ # Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):