from foolscap.eventual import eventually
from allmydata.util import base32, mathutil, hashutil, log, observer
-from allmydata.util.assertutil import _assert
+from allmydata.util.assertutil import _assert, precondition
from allmydata import codec, hashtree, storage, uri
-from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
- IDownloadStatus, IDownloadResults, NotEnoughSharesError
+from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
+ IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
pass
class BadURIExtension(IntegrityCheckError):
pass
-class BadPlaintextHashValue(IntegrityCheckError):
+class UnsupportedErasureCodec(BadURIExtension):
pass
class BadCrypttextHashValue(IntegrityCheckError):
pass
self.downloadable = downloadable
self._decryptor = AES(key)
self._crypttext_hasher = hashutil.crypttext_hasher()
- self._plaintext_hasher = hashutil.plaintext_hasher()
self.length = 0
self.total_length = total_length
self._segment_number = 0
- self._plaintext_hash_tree = None
self._crypttext_hash_tree = None
self._opened = False
self._log_parent = log_parent
kwargs["facility"] = "download.output"
return log.msg(*args, **kwargs)
- def setup_hashtrees(self, plaintext_hashtree, crypttext_hashtree):
- self._plaintext_hash_tree = plaintext_hashtree
- self._crypttext_hash_tree = crypttext_hashtree
+ def got_crypttext_hash_tree(self, crypttext_hash_tree):
+ self._crypttext_hash_tree = crypttext_hash_tree
def write_segment(self, crypttext):
self.length += len(crypttext)
del crypttext
# now we're back down to 1*segment_size.
-
- self._plaintext_hasher.update(plaintext)
- if self._plaintext_hash_tree:
- ph = hashutil.plaintext_segment_hasher()
- ph.update(plaintext)
- plaintext_leaves = {self._segment_number: ph.digest()}
- self.log(format="plaintext leaf hash (%(bytes)sB) [%(segnum)d] is %(hash)s",
- bytes=len(plaintext),
- segnum=self._segment_number, hash=base32.b2a(ph.digest()),
- level=log.NOISY)
- self._plaintext_hash_tree.set_hashes(leaves=plaintext_leaves)
-
self._segment_number += 1
# We're still at 1*segment_size. The Downloadable is responsible for
# any memory usage beyond this.
def close(self):
self.crypttext_hash = self._crypttext_hasher.digest()
- self.plaintext_hash = self._plaintext_hasher.digest()
self.log("download finished, closing IDownloadable", level=log.NOISY)
self.downloadable.close()
def finish(self):
return self.downloadable.finish()
-class ValidatedBucket:
+class ValidatedThingObtainer:
+ def __init__(self, validatedthingproxies, debugname, log_id):
+ self._validatedthingproxies = validatedthingproxies
+ self._debugname = debugname
+ self._log_id = log_id
+
+ def _bad(self, f, validatedthingproxy):
+ level = log.WEIRD
+ if f.check(DeadReferenceError):
+ level = log.UNUSUAL
+ log.msg(parent=self._log_id, facility="tahoe.immutable.download",
+ format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
+ op=self._debugname, validatedthingproxy=str(validatedthingproxy),
+ failure=f, level=level, umid="JGXxBA")
+ if not self._validatedthingproxies:
+ raise NotEnoughSharesError("ran out of peers, last error was %s" % (f,))
+ # try again with a different one
+ return self._try_the_next_one()
+
+ def _try_the_next_one(self):
+ vtp = self._validatedthingproxies.pop(0)
+ d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
+ d.addErrback(self._bad, vtp)
+ return d
+
+ def start(self):
+ return self._try_the_next_one()
+
+class ValidatedCrypttextHashTreeProxy:
+ implements(IValidatedThingProxy)
+ """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
+ its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
+ start() method that fires with self once I get a valid one). """
+ def __init__(self, readbucketproxy, crypttext_hash_tree, fetch_failures=None):
+ # fetch_failures is for debugging -- see test_encode.py
+ self._readbucketproxy = readbucketproxy
+ self._fetch_failures = fetch_failures
+ self._crypttext_hash_tree = crypttext_hash_tree
+
+ def _validate(self, proposal):
+ ct_hashes = dict(list(enumerate(proposal)))
+ try:
+ self._crypttext_hash_tree.set_hashes(ct_hashes)
+ except hashtree.BadHashError:
+ if self._fetch_failures is not None:
+ self._fetch_failures["crypttext_hash_tree"] += 1
+ raise
+ return self
+
+ def start(self):
+ d = self._readbucketproxy.startIfNecessary()
+ d.addCallback(lambda ignored: self._readbucketproxy.get_crypttext_hashes())
+ d.addCallback(self._validate)
+ return d
+
+class ValidatedExtendedURIProxy:
+ implements(IValidatedThingProxy)
+ """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
+ retrieving and validating the elements from the UEB. """
+
+ def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
+ # fetch_failures is for debugging -- see test_encode.py
+ self._fetch_failures = fetch_failures
+ self._readbucketproxy = readbucketproxy
+ precondition(IVerifierURI.providedBy(verifycap), verifycap)
+ self._verifycap = verifycap
+
+ # required
+ self.segment_size = None
+ self.crypttext_root_hash = None
+ self.share_root_hash = None
+
+ # computed
+ self.num_segments = None
+ self.tail_segment_size = None
+
+ # optional
+ self.crypttext_hash = None
+
+ def __str__(self):
+ return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
+
+ def _check_integrity(self, data):
+ h = hashutil.uri_extension_hash(data)
+ if h != self._verifycap.uri_extension_hash:
+ msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
+ (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
+ if self._fetch_failures is not None:
+ self._fetch_failures["uri_extension"] += 1
+ raise BadURIExtensionHashValue(msg)
+ else:
+ return data
+
+ def _parse_and_validate(self, data):
+ d = uri.unpack_extension(data)
+
+ # There are several kinds of things that can be found in a UEB. First, things that we
+ # really need to learn from the UEB in order to do this download. Next: things which are
+ # optional but not redundant -- if they are present in the UEB they will get used. Next,
+ # things that are optional and redundant. These things are required to be consistent:
+ # they don't have to be in the UEB, but if they are in the UEB then they will be checked
+ # for consistency with the already-known facts, and if they are inconsistent then an
+ # exception will be raised. These things aren't actually used -- they are just tested
+ # for consistency and ignored. Finally: things which are deprecated -- they ought not be
+ # in the UEB at all, and if they are present then a warning will be logged but they are
+ # otherwise ignored.
+
+ # First, things that we really need to learn from the UEB: segment_size,
+ # crypttext_root_hash, and share_root_hash.
+ self.segment_size = d['segment_size']
+
+ self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
+
+ tail_data_size = self._verifycap.size % self.segment_size
+ if not tail_data_size:
+ tail_data_size = self.segment_size
+ # padding for erasure code
+ self.tail_segment_size = mathutil.next_multiple(tail_data_size, self._verifycap.needed_shares)
+
+ # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
+ # matches this read-cap or verify-cap. The integrity check on the shares is not
+ # sufficient to prevent the original encoder from creating some shares of file A and
+ # other shares of file B.
+ self.crypttext_root_hash = d['crypttext_root_hash']
+
+ self.share_root_hash = d['share_root_hash']
+
+
+ # Next: things that are optional and not redundant: crypttext_hash
+ if d.has_key('crypttext_hash'):
+ self.crypttext_hash = d['crypttext_hash']
+ if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
+ raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
+
+
+ # Next: things that are optional, redundant, and required to be consistent: codec_name,
+ # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
+ if d.has_key('codec_name'):
+ if d['codec_name'] != "crs":
+ raise UnsupportedErasureCodec(d['codec_name'])
+
+ if d.has_key('codec_params'):
+ ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
+ if ucpss != self.segment_size:
+ raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
+ "self.segment_size: %s" % (ucpss, self.segment_size))
+ if ucpns != self._verifycap.needed_shares:
+ raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
+ "self._verifycap.needed_shares: %s" % (ucpns,
+ self._verifycap.needed_shares))
+ if ucpts != self._verifycap.total_shares:
+ raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
+ "self._verifycap.total_shares: %s" % (ucpts,
+ self._verifycap.total_shares))
+
+ if d.has_key('tail_codec_params'):
+ utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
+ if utcpss != self.tail_segment_size:
+ raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
+ "self.tail_segment_size: %s, self._verifycap.size: %s, "
+ "self.segment_size: %s, self._verifycap.needed_shares: %s"
+ % (utcpss, self.tail_segment_size, self._verifycap.size,
+ self.segment_size, self._verifycap.needed_shares))
+ if utcpns != self._verifycap.needed_shares:
+ raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
+ "self._verifycap.needed_shares: %s" % (utcpns,
+ self._verifycap.needed_shares))
+ if utcpts != self._verifycap.total_shares:
+ raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
+ "self._verifycap.total_shares: %s" % (utcpts,
+ self._verifycap.total_shares))
+
+ if d.has_key('num_segments'):
+ if d['num_segments'] != self.num_segments:
+ raise BadURIExtension("inconsistent num_segments: size: %s, "
+ "segment_size: %s, computed_num_segments: %s, "
+ "ueb_num_segments: %s" % (self._verifycap.size,
+ self.segment_size,
+ self.num_segments, d['num_segments']))
+
+ if d.has_key('size'):
+ if d['size'] != self._verifycap.size:
+ raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
+ (self._verifycap.size, d['size']))
+
+ if d.has_key('needed_shares'):
+ if d['needed_shares'] != self._verifycap.needed_shares:
+ raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
+ "needed shares: %s" % (self._verifycap.total_shares,
+ d['needed_shares']))
+
+ if d.has_key('total_shares'):
+ if d['total_shares'] != self._verifycap.total_shares:
+ raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
+ "total shares: %s" % (self._verifycap.total_shares,
+ d['total_shares']))
+
+ # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
+ if d.get('plaintext_hash'):
+ log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
+ "and is no longer used. Ignoring. %s" % (self,))
+ if d.get('plaintext_root_hash'):
+ log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
+ "reasons and is no longer used. Ignoring. %s" % (self,))
+
+ return self
+
+ def start(self):
+ """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
+ it. Returns a deferred which is called back with self once the fetch is successful, or
+ is erred back if it fails. """
+ d = self._readbucketproxy.startIfNecessary()
+ d.addCallback(lambda ignored: self._readbucketproxy.get_uri_extension())
+ d.addCallback(self._check_integrity)
+ d.addCallback(self._parse_and_validate)
+ return d
+
+class ValidatedReadBucketProxy:
"""I am a front-end for a remote storage bucket, responsible for
retrieving and validating data from that bucket.
"""
def __init__(self, sharenum, bucket,
- share_hash_tree, roothash,
+ share_hash_tree, share_root_hash,
num_blocks):
+ """ share_root_hash is the root of the share hash tree; share_root_hash is stored in the UEB """
self.sharenum = sharenum
self.bucket = bucket
self._share_hash = None # None means not validated yet
self.share_hash_tree = share_hash_tree
- self._roothash = roothash
+ self._share_root_hash = share_root_hash
self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks)
self.started = False
try:
if not self._share_hash:
sh = dict(sharehashes)
- sh[0] = self._roothash # always use our own root, from the URI
+ sh[0] = self._share_root_hash # always use our own root, from the URI
sht = self.share_hash_tree
if sht.get_leaf_index(self.sharenum) not in sh:
raise hashtree.NotEnoughHashesError
else:
log.msg(" block data start/end: %r .. %r" %
(blockdata[:50], blockdata[-50:]))
- log.msg(" root hash: %s" % base32.b2a(self._roothash))
+ log.msg(" root hash: %s" % base32.b2a(self._share_root_hash))
log.msg(" share hash tree:\n" + self.share_hash_tree.dump())
log.msg(" block hash tree:\n" + self.block_hash_tree.dump())
lines = []
class FileDownloader:
implements(IPushProducer)
- check_crypttext_hash = True
- check_plaintext_hash = True
_status = None
def __init__(self, client, u, downloadable):
+ precondition(isinstance(u, uri.CHKFileURI), u)
self._client = client
- u = IFileURI(u)
+ self._uri = u
self._storage_index = u.storage_index
self._uri_extension_hash = u.uri_extension_hash
- self._total_shares = u.total_shares
- self._size = u.size
- self._num_needed_shares = u.needed_shares
+ self._vup = None # ValidatedExtendedURIProxy
self._si_s = storage.si_b2a(self._storage_index)
self.init_logging()
self._status = s = DownloadStatus()
s.set_status("Starting")
s.set_storage_index(self._storage_index)
- s.set_size(self._size)
+ s.set_size(self._uri.size)
s.set_helper(False)
s.set_active(True)
self._results = DownloadResults()
s.set_results(self._results)
- self._results.file_size = self._size
+ self._results.file_size = self._uri.size
self._results.timings["servers_peer_selection"] = {}
self._results.timings["fetch_per_server"] = {}
self._results.timings["cumulative_fetch"] = 0.0
if IConsumer.providedBy(downloadable):
downloadable.registerProducer(self, True)
self._downloadable = downloadable
- self._output = Output(downloadable, u.key, self._size, self._log_number,
+ self._output = Output(downloadable, u.key, self._uri.size, self._log_number,
self._status)
self.active_buckets = {} # k: shnum, v: bucket
self._share_buckets = [] # list of (sharenum, bucket) tuples
self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
- self._uri_extension_sources = []
- self._uri_extension_data = None
+ self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
- self._fetch_failures = {"uri_extension": 0,
- "plaintext_hashroot": 0,
- "plaintext_hashtree": 0,
- "crypttext_hashroot": 0,
- "crypttext_hashtree": 0,
- }
+ self._crypttext_hash_tree = None
def init_logging(self):
self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
self._status.set_active(False)
def start(self):
+ assert isinstance(self._uri, uri.CHKFileURI), (self._uri, type(self._uri))
self.log("starting download")
# first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders)
- # now get the uri_extension block from somebody and validate it
+ # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
d.addCallback(self._obtain_uri_extension)
- d.addCallback(self._got_uri_extension)
- d.addCallback(self._get_hashtrees)
- d.addCallback(self._create_validated_buckets)
+ d.addCallback(self._get_crypttext_hash_tree)
# once we know that, we can download blocks from everybody
d.addCallback(self._download_all_segments)
def _finished(res):
for sharenum, bucket in buckets.iteritems():
b = layout.ReadBucketProxy(bucket, peerid, self._si_s)
self.add_share_bucket(sharenum, b)
- self._uri_extension_sources.append(b)
+
if self._results:
if peerid not in self._results.servermap:
self._results.servermap[peerid] = set()
shnum = vbucket.sharenum
del self.active_buckets[shnum]
s = self._share_vbuckets[shnum]
- # s is a set of ValidatedBucket instances
+ # s is a set of ValidatedReadBucketProxy instances
s.remove(vbucket)
# ... which might now be empty
if not s:
del self._share_vbuckets[shnum]
def _got_all_shareholders(self, res):
+ assert isinstance(self._uri, uri.CHKFileURI), (self._uri, type(self._uri))
if self._results:
now = time.time()
self._results.timings["peer_selection"] = now - self._started
- if len(self._share_buckets) < self._num_needed_shares:
+ if len(self._share_buckets) < self._uri.needed_shares:
raise NotEnoughSharesError
#for s in self._share_vbuckets.values():
# for vb in s:
- # assert isinstance(vb, ValidatedBucket), \
- # "vb is %s but should be a ValidatedBucket" % (vb,)
-
- def _unpack_uri_extension_data(self, data):
- return uri.unpack_extension(data)
+ # assert isinstance(vb, ValidatedReadBucketProxy), \
+ # "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
def _obtain_uri_extension(self, ignored):
+ assert isinstance(self._uri, uri.CHKFileURI), self._uri
# all shareholders are supposed to have a copy of uri_extension, and
# all are supposed to be identical. We compute the hash of the data
# that comes back, and compare it against the version in our URI. If
if self._status:
self._status.set_status("Obtaining URI Extension")
- self._uri_extension_fetch_started = time.time()
- def _validate(proposal, bucket):
- h = hashutil.uri_extension_hash(proposal)
- if h != self._uri_extension_hash:
- self._fetch_failures["uri_extension"] += 1
- msg = ("The copy of uri_extension we received from "
- "%s was bad: wanted %s, got %s" %
- (bucket,
- base32.b2a(self._uri_extension_hash),
- base32.b2a(h)))
- self.log(msg, level=log.SCARY, umid="jnkTtQ")
- raise BadURIExtensionHashValue(msg)
- return self._unpack_uri_extension_data(proposal)
- return self._obtain_validated_thing(None,
- self._uri_extension_sources,
- "uri_extension",
- "get_uri_extension", (), _validate)
-
- def _obtain_validated_thing(self, ignored, sources, name, methname, args,
- validatorfunc):
- if not sources:
- raise NotEnoughSharesError("started with zero peers while fetching "
- "%s" % name)
- bucket = sources[0]
- sources = sources[1:]
- #d = bucket.callRemote(methname, *args)
- d = bucket.startIfNecessary()
- d.addCallback(lambda res: getattr(bucket, methname)(*args))
- d.addCallback(validatorfunc, bucket)
- def _bad(f):
- level = log.WEIRD
- if f.check(DeadReferenceError):
- level = log.UNUSUAL
- self.log(format="operation %(op)s from vbucket %(vbucket)s failed",
- op=name, vbucket=str(bucket),
- failure=f, level=level, umid="JGXxBA")
- if not sources:
- raise NotEnoughSharesError("ran out of peers, last error was %s"
- % (f,))
- # try again with a different one
- return self._obtain_validated_thing(None, sources, name,
- methname, args, validatorfunc)
- d.addErrback(_bad)
- return d
+ uri_extension_fetch_started = time.time()
- def _got_uri_extension(self, uri_extension_data):
- if self._results:
- elapsed = time.time() - self._uri_extension_fetch_started
- self._results.timings["uri_extension"] = elapsed
+ vups = []
+ for sharenum, bucket in self._share_buckets:
+ vups.append(ValidatedExtendedURIProxy(bucket, self._uri.get_verifier(), self._fetch_failures))
+ vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._log_number)
+ d = vto.start()
- d = self._uri_extension_data = uri_extension_data
+ def _got_uri_extension(vup):
+ precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
+ if self._results:
+ elapsed = time.time() - uri_extension_fetch_started
+ self._results.timings["uri_extension"] = elapsed
- self._codec = codec.get_decoder_by_name(d['codec_name'])
- self._codec.set_serialized_params(d['codec_params'])
- self._tail_codec = codec.get_decoder_by_name(d['codec_name'])
- self._tail_codec.set_serialized_params(d['tail_codec_params'])
+ self._vup = vup
+ self._codec = codec.CRSDecoder()
+ self._codec.set_params(self._vup.segment_size, self._uri.needed_shares, self._uri.total_shares)
+ self._tail_codec = codec.CRSDecoder()
+ self._tail_codec.set_params(self._vup.tail_segment_size, self._uri.needed_shares, self._uri.total_shares)
- crypttext_hash = d.get('crypttext_hash', None) # optional
- if crypttext_hash:
- assert isinstance(crypttext_hash, str)
- assert len(crypttext_hash) == 32
- self._crypttext_hash = crypttext_hash
- self._plaintext_hash = d.get('plaintext_hash', None) # optional
+ self._current_segnum = 0
- self._roothash = d['share_root_hash']
+ self._share_hashtree = hashtree.IncompleteHashTree(self._uri.total_shares)
+ self._share_hashtree.set_hashes({0: vup.share_root_hash})
- self._segment_size = segment_size = d['segment_size']
- self._total_segments = mathutil.div_ceil(self._size, segment_size)
- self._current_segnum = 0
+ self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
+ self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
+ d.addCallback(_got_uri_extension)
+ return d
- self._share_hashtree = hashtree.IncompleteHashTree(d['total_shares'])
- self._share_hashtree.set_hashes({0: self._roothash})
+ def _get_crypttext_hash_tree(self, res):
+ vchtps = []
+ for sharenum, bucket in self._share_buckets:
+ vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._fetch_failures)
+ vchtps.append(vchtp)
- def _get_hashtrees(self, res):
- self._get_hashtrees_started = time.time()
+ _get_crypttext_hash_tree_started = time.time()
if self._status:
- self._status.set_status("Retrieving Hash Trees")
- d = defer.maybeDeferred(self._get_plaintext_hashtrees)
- d.addCallback(self._get_crypttext_hashtrees)
- d.addCallback(self._setup_hashtrees)
- return d
+ self._status.set_status("Retrieving crypttext hash tree")
- def _get_plaintext_hashtrees(self):
- # plaintext hashes are optional. If the root isn't in the UEB, then
- # the share will be holding an empty list. We don't even bother
- # fetching it.
- if "plaintext_root_hash" not in self._uri_extension_data:
- self._plaintext_hashtree = None
- return
- def _validate_plaintext_hashtree(proposal, bucket):
- if proposal[0] != self._uri_extension_data['plaintext_root_hash']:
- self._fetch_failures["plaintext_hashroot"] += 1
- msg = ("The copy of the plaintext_root_hash we received from"
- " %s was bad" % bucket)
- raise BadPlaintextHashValue(msg)
- pt_hashtree = hashtree.IncompleteHashTree(self._total_segments)
- pt_hashes = dict(list(enumerate(proposal)))
- try:
- pt_hashtree.set_hashes(pt_hashes)
- except hashtree.BadHashError:
- # the hashes they gave us were not self-consistent, even
- # though the root matched what we saw in the uri_extension
- # block
- self._fetch_failures["plaintext_hashtree"] += 1
- raise
- self._plaintext_hashtree = pt_hashtree
- d = self._obtain_validated_thing(None,
- self._uri_extension_sources,
- "plaintext_hashes",
- "get_plaintext_hashes", (),
- _validate_plaintext_hashtree)
- return d
+ vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._log_number)
+ d = vto.start()
- def _get_crypttext_hashtrees(self, res):
- # Ciphertext hash tree root is mandatory, so that there is at
- # most one ciphertext that matches this read-cap or
- # verify-cap. The integrity check on the shares is not
- # sufficient to prevent the original encoder from creating
- # some shares of file A and other shares of file B.
- if "crypttext_root_hash" not in self._uri_extension_data:
- raise BadURIExtension("URI Extension block did not have the ciphertext hash tree root")
- def _validate_crypttext_hashtree(proposal, bucket):
- if proposal[0] != self._uri_extension_data['crypttext_root_hash']:
- self._fetch_failures["crypttext_hashroot"] += 1
- msg = ("The copy of the crypttext_root_hash we received from"
- " %s was bad" % bucket)
- raise BadCrypttextHashValue(msg)
- ct_hashtree = hashtree.IncompleteHashTree(self._total_segments)
- ct_hashes = dict(list(enumerate(proposal)))
- try:
- ct_hashtree.set_hashes(ct_hashes)
- except hashtree.BadHashError:
- self._fetch_failures["crypttext_hashtree"] += 1
- raise
- ct_hashtree.set_hashes(ct_hashes)
- self._crypttext_hashtree = ct_hashtree
- d = self._obtain_validated_thing(None,
- self._uri_extension_sources,
- "crypttext_hashes",
- "get_crypttext_hashes", (),
- _validate_crypttext_hashtree)
+ def _got_crypttext_hash_tree(res):
+ self._crypttext_hash_tree = res._crypttext_hash_tree
+ self._output.got_crypttext_hash_tree(self._crypttext_hash_tree)
+ if self._results:
+ elapsed = time.time() - _get_crypttext_hash_tree_started
+ self._results.timings["hashtrees"] = elapsed
+ d.addCallback(_got_crypttext_hash_tree)
return d
- def _setup_hashtrees(self, res):
- self._output.setup_hashtrees(self._plaintext_hashtree,
- self._crypttext_hashtree)
- if self._results:
- elapsed = time.time() - self._get_hashtrees_started
- self._results.timings["hashtrees"] = elapsed
-
- def _create_validated_buckets(self, ignored=None):
- self._share_vbuckets = {}
- for sharenum, bucket in self._share_buckets:
- vbucket = ValidatedBucket(sharenum, bucket,
- self._share_hashtree,
- self._roothash,
- self._total_segments)
- s = self._share_vbuckets.setdefault(sharenum, set())
- s.add(vbucket)
-
def _activate_enough_buckets(self):
- """either return a mapping from shnum to a ValidatedBucket that can
+ """either return a mapping from shnum to a ValidatedReadBucketProxy that can
provide data for that share, or raise NotEnoughSharesError"""
+ assert isinstance(self._uri, uri.CHKFileURI), self._uri
- while len(self.active_buckets) < self._num_needed_shares:
+ while len(self.active_buckets) < self._uri.needed_shares:
# need some more
handled_shnums = set(self.active_buckets.keys())
available_shnums = set(self._share_vbuckets.keys())
def _download_all_segments(self, res):
- # the promise: upon entry to this function, self._share_vbuckets
- # contains enough buckets to complete the download, and some extra
- # ones to tolerate some buckets dropping out or having errors.
- # self._share_vbuckets is a dictionary that maps from shnum to a set
- # of ValidatedBuckets, which themselves are wrappers around
- # RIBucketReader references.
- self.active_buckets = {} # k: shnum, v: ValidatedBucket instance
+ for sharenum, bucket in self._share_buckets:
+ vbucket = ValidatedReadBucketProxy(sharenum, bucket,
+ self._share_hashtree,
+ self._vup.share_root_hash,
+ self._vup.num_segments)
+ s = self._share_vbuckets.setdefault(sharenum, set())
+ s.add(vbucket)
+
+ # after the above code, self._share_vbuckets contains enough
+ # buckets to complete the download, and some extra ones to
+ # tolerate some buckets dropping out or having
+ # errors. self._share_vbuckets is a dictionary that maps from
+ # shnum to a set of ValidatedBuckets, which themselves are
+ # wrappers around RIBucketReader references.
+ self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
self._started_fetching = time.time()
d = defer.succeed(None)
- for segnum in range(self._total_segments-1):
+ for segnum in range(self._vup.num_segments-1):
d.addCallback(self._download_segment, segnum)
# this pause, at the end of write, prevents pre-fetch from
# happening until the consumer is ready for more data.
d.addCallback(self._check_for_pause)
- d.addCallback(self._download_tail_segment, self._total_segments-1)
+ d.addCallback(self._download_tail_segment, self._vup.num_segments-1)
return d
def _check_for_pause(self, res):
return res
def _download_segment(self, res, segnum):
+ assert isinstance(self._uri, uri.CHKFileURI), self._uri
if self._status:
self._status.set_status("Downloading segment %d of %d" %
- (segnum+1, self._total_segments))
+ (segnum+1, self._vup.num_segments))
self.log("downloading seg#%d of %d (%d%%)"
- % (segnum, self._total_segments,
- 100.0 * segnum / self._total_segments))
+ % (segnum, self._vup.num_segments,
+ 100.0 * segnum / self._vup.num_segments))
# memory footprint: when the SegmentDownloader finishes pulling down
# all shares, we have 1*segment_size of usage.
- segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares,
+ segmentdler = SegmentDownloader(self, segnum, self._uri.needed_shares,
self._results)
started = time.time()
d = segmentdler.start()
return d
def _download_tail_segment(self, res, segnum):
+ assert isinstance(self._uri, uri.CHKFileURI), self._uri
self.log("downloading seg#%d of %d (%d%%)"
- % (segnum, self._total_segments,
- 100.0 * segnum / self._total_segments))
- segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares,
+ % (segnum, self._vup.num_segments,
+ 100.0 * segnum / self._vup.num_segments))
+ segmentdler = SegmentDownloader(self, segnum, self._uri.needed_shares,
self._results)
started = time.time()
d = segmentdler.start()
del buffers
# we never send empty segments. If the data was an exact multiple
# of the segment size, the last segment will be full.
- pad_size = mathutil.pad_size(self._size, self._segment_size)
- tail_size = self._segment_size - pad_size
+ pad_size = mathutil.pad_size(self._uri.size, self._vup.segment_size)
+ tail_size = self._vup.segment_size - pad_size
segment = segment[:tail_size]
started_decrypt = time.time()
self._output.write_segment(segment)
return d
def _done(self, res):
+ assert isinstance(self._uri, uri.CHKFileURI), self._uri
self.log("download done")
if self._results:
now = time.time()
self._results.timings["total"] = now - self._started
self._results.timings["segments"] = now - self._started_fetching
self._output.close()
- if self.check_crypttext_hash and self._crypttext_hash:
- _assert(self._crypttext_hash == self._output.crypttext_hash,
+ if self._vup.crypttext_hash:
+ _assert(self._vup.crypttext_hash == self._output.crypttext_hash,
"bad crypttext_hash: computed=%s, expected=%s" %
(base32.b2a(self._output.crypttext_hash),
- base32.b2a(self._crypttext_hash)))
- if self.check_plaintext_hash and self._plaintext_hash:
- _assert(self._plaintext_hash == self._output.plaintext_hash,
- "bad plaintext_hash: computed=%s, expected=%s" %
- (base32.b2a(self._output.plaintext_hash),
- base32.b2a(self._plaintext_hash)))
- _assert(self._output.length == self._size,
- got=self._output.length, expected=self._size)
+ base32.b2a(self._vup.crypttext_hash)))
+ _assert(self._output.length == self._uri.size,
+ got=self._output.length, expected=self._uri.size)
return self._output.finish()
def get_download_status(self):
implements(IDownloadTarget, IConsumer)
def __init__(self, consumer):
self._consumer = consumer
- self._when_finished = observer.OneShotObserverList()
-
- def when_finished(self):
- # I think this is unused, along with self._when_finished . But I need
- # to trace the error paths to be sure.
- return self._when_finished.when_fired()
def registerProducer(self, producer, streaming):
self._consumer.registerProducer(producer, streaming)
def write(self, data):
self._consumer.write(data)
def close(self):
- self._when_finished.fire(None)
+ pass
def fail(self, why):
- self._when_finished.fire(why)
+ pass
def register_canceller(self, cb):
pass
def finish(self):
-
from zope.interface import implements
from twisted.trial import unittest
from twisted.internet import defer, reactor
from foolscap import eventual
from allmydata import hashtree, uri
from allmydata.immutable import encode, upload, download
-from allmydata.util import hashutil
+from allmydata.util import base32, hashutil
from allmydata.util.assertutil import _assert
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
import common_util as testutil
def log(self, *args, **kwargs):
pass
-class FakeBucketWriterProxy:
+class FakeBucketReaderWriterProxy:
implements(IStorageBucketWriter, IStorageBucketReader)
# these are used for both reading and writing
def __init__(self, mode="good"):
def get_plaintext_hashes(self):
def _try():
hashes = self.plaintext_hashes[:]
- if self.mode == "bad plaintext hashroot":
- hashes[0] = flip_bit(hashes[0])
- if self.mode == "bad plaintext hash":
- hashes[1] = flip_bit(hashes[1])
return hashes
return defer.maybeDeferred(_try)
assert length <= len(data)
return data[:length]
+class ValidatedExtendedURIProxy(unittest.TestCase):
+ K = 4
+ M = 10
+ SIZE = 200
+ SEGSIZE = 72
+ _TMP = SIZE%SEGSIZE
+ if _TMP == 0:
+ _TMP = SEGSIZE
+ if _TMP % K != 0:
+ _TMP += (K - (_TMP % K))
+ TAIL_SEGSIZE = _TMP
+ _TMP = SIZE / SEGSIZE
+ if SIZE % SEGSIZE != 0:
+ _TMP += 1
+ NUM_SEGMENTS = _TMP
+ mindict = { 'segment_size': SEGSIZE,
+ 'crypttext_root_hash': '0'*hashutil.CRYPTO_VAL_SIZE,
+ 'share_root_hash': '1'*hashutil.CRYPTO_VAL_SIZE }
+ optional_consistent = { 'crypttext_hash': '2'*hashutil.CRYPTO_VAL_SIZE,
+ 'codec_name': "crs",
+ 'codec_params': "%d-%d-%d" % (SEGSIZE, K, M),
+ 'tail_codec_params': "%d-%d-%d" % (TAIL_SEGSIZE, K, M),
+ 'num_segments': NUM_SEGMENTS,
+ 'size': SIZE,
+ 'needed_shares': K,
+ 'total_shares': M,
+ 'plaintext_hash': "anything",
+ 'plaintext_root_hash': "anything", }
+ # optional_inconsistent = { 'crypttext_hash': ('2'*(hashutil.CRYPTO_VAL_SIZE-1), "", 77),
+ optional_inconsistent = { 'crypttext_hash': (77,),
+ 'codec_name': ("digital fountain", ""),
+ 'codec_params': ("%d-%d-%d" % (SEGSIZE, K-1, M),
+ "%d-%d-%d" % (SEGSIZE-1, K, M),
+ "%d-%d-%d" % (SEGSIZE, K, M-1)),
+ 'tail_codec_params': ("%d-%d-%d" % (TAIL_SEGSIZE, K-1, M),
+ "%d-%d-%d" % (TAIL_SEGSIZE-1, K, M),
+ "%d-%d-%d" % (TAIL_SEGSIZE, K, M-1)),
+ 'num_segments': (NUM_SEGMENTS-1,),
+ 'size': (SIZE-1,),
+ 'needed_shares': (K-1,),
+ 'total_shares': (M-1,), }
+
+ def _test(self, uebdict):
+ uebstring = uri.pack_extension(uebdict)
+ uebhash = hashutil.uri_extension_hash(uebstring)
+ fb = FakeBucketReaderWriterProxy()
+ fb.put_uri_extension(uebstring)
+ verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
+ vup = download.ValidatedExtendedURIProxy(fb, verifycap)
+ return vup.start()
+
+ def _test_accept(self, uebdict):
+ return self._test(uebdict)
+
+ def _should_fail(self, res, expected_failures):
+ if isinstance(res, Failure):
+ res.trap(*expected_failures)
+ else:
+ self.fail("was supposed to raise %s, not get '%s'" % (expected_failures, res))
+
+ def _test_reject(self, uebdict):
+ d = self._test(uebdict)
+ d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))
+ return d
+
+ def test_accept_minimal(self):
+ return self._test_accept(self.mindict)
+
+ def test_reject_insufficient(self):
+ dl = []
+ for k in self.mindict.iterkeys():
+ insuffdict = self.mindict.copy()
+ del insuffdict[k]
+ d = self._test_reject(insuffdict)
+ dl.append(d)
+ return defer.DeferredList(dl)
+
+ def test_accept_optional(self):
+ dl = []
+ for k in self.optional_consistent.iterkeys():
+ mydict = self.mindict.copy()
+ mydict[k] = self.optional_consistent[k]
+ d = self._test_accept(mydict)
+ dl.append(d)
+ return defer.DeferredList(dl)
+
+ def test_reject_optional(self):
+ dl = []
+ for k in self.optional_inconsistent.iterkeys():
+ for v in self.optional_inconsistent[k]:
+ mydict = self.mindict.copy()
+ mydict[k] = v
+ d = self._test_reject(mydict)
+ dl.append(d)
+ return defer.DeferredList(dl)
+
class Encode(unittest.TestCase):
def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
shareholders = {}
for shnum in range(NUM_SHARES):
- peer = FakeBucketWriterProxy()
+ peer = FakeBucketReaderWriterProxy()
shareholders[shnum] = peer
all_shareholders.append(peer)
e.set_shareholders(shareholders)
all_peers = []
for shnum in range(NUM_SHARES):
mode = bucket_modes.get(shnum, "good")
- peer = FakeBucketWriterProxy(mode)
+ peer = FakeBucketReaderWriterProxy(mode)
shareholders[shnum] = peer
e.set_shareholders(shareholders)
return e.start()
needed_shares=required_shares,
total_shares=num_shares,
size=file_size)
- URI = u.to_string()
client = FakeClient()
if not target:
target = download.Data()
- fd = download.FileDownloader(client, URI, target)
+ fd = download.FileDownloader(client, u, target)
# we manually cycle the FileDownloader through a number of steps that
# would normally be sequenced by a Deferred chain in
# Make it possible to obtain uri_extension from the shareholders.
# Arrange for shareholders[0] to be the first, so we can selectively
# corrupt the data it returns.
- fd._uri_extension_sources = shareholders.values()
- fd._uri_extension_sources.remove(shareholders[0])
- fd._uri_extension_sources.insert(0, shareholders[0])
+ uri_extension_sources = shareholders.values()
+ uri_extension_sources.remove(shareholders[0])
+ uri_extension_sources.insert(0, shareholders[0])
d = defer.succeed(None)
# replace everybody's crypttext hash trees with a different one
# (computed over a different file), then modify our uri_extension
# to reflect the new crypttext hash tree root
- def _corrupt_crypttext_hashes(uri_extension):
- assert isinstance(uri_extension, dict)
- assert 'crypttext_root_hash' in uri_extension
+ def _corrupt_crypttext_hashes(unused):
+ assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup
+ assert fd._vup.crypttext_root_hash, fd._vup
badhash = hashutil.tagged_hash("bogus", "data")
- bad_crypttext_hashes = [badhash] * uri_extension['num_segments']
+ bad_crypttext_hashes = [badhash] * fd._vup.num_segments
badtree = hashtree.HashTree(bad_crypttext_hashes)
for bucket in shareholders.values():
bucket.crypttext_hashes = list(badtree)
- uri_extension['crypttext_root_hash'] = badtree[0]
- return uri_extension
+ fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)
+ fd._crypttext_hash_tree.set_hashes({0: badtree[0]})
+ return fd._vup
d.addCallback(_corrupt_crypttext_hashes)
- elif "omit_crypttext_root_hash" in recover_mode:
- # make it as though the crypttext hash tree root had not
- # been in the UEB
- def _remove_crypttext_hashes(uri_extension):
- assert isinstance(uri_extension, dict)
- assert 'crypttext_root_hash' in uri_extension
- del uri_extension['crypttext_root_hash']
- return uri_extension
- d.addCallback(_remove_crypttext_hashes)
-
- d.addCallback(fd._got_uri_extension)
# also have the FileDownloader ask for hash trees
- d.addCallback(fd._get_hashtrees)
+ d.addCallback(fd._get_crypttext_hash_tree)
- d.addCallback(fd._create_validated_buckets)
d.addCallback(fd._download_all_segments)
d.addCallback(fd._done)
def _done(newdata):
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure))
- self.failUnless(res.check(NotEnoughSharesError))
+ self.failUnless(res.check(NotEnoughSharesError), res)
d.addBoth(_done)
return d
def assertFetchFailureIn(self, fd, where):
expected = {"uri_extension": 0,
- "plaintext_hashroot": 0,
- "plaintext_hashtree": 0,
- "crypttext_hashroot": 0,
- "crypttext_hashtree": 0,
+ "crypttext_hash_tree": 0,
}
if where is not None:
expected[where] += 1
d.addCallback(self.assertFetchFailureIn, "uri_extension")
return d
- def OFF_test_bad_plaintext_hashroot(self):
- # the first server has a bad plaintext hashroot, so we will fail over
- # to a different server.
- modemap = dict([(i, "bad plaintext hashroot") for i in range(1)] +
- [(i, "good") for i in range(1, 10)])
- d = self.send_and_recover((4,8,10), bucket_modes=modemap)
- d.addCallback(self.assertFetchFailureIn, "plaintext_hashroot")
- return d
-
def test_bad_crypttext_hashroot(self):
# the first server has a bad crypttext hashroot, so we will fail
# over to a different server.
modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
[(i, "good") for i in range(1, 10)])
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
- d.addCallback(self.assertFetchFailureIn, "crypttext_hashroot")
- return d
-
- def OFF_test_bad_plaintext_hashes(self):
- # the first server has a bad plaintext hash block, so we will fail
- # over to a different server.
- modemap = dict([(i, "bad plaintext hash") for i in range(1)] +
- [(i, "good") for i in range(1, 10)])
- d = self.send_and_recover((4,8,10), bucket_modes=modemap)
- d.addCallback(self.assertFetchFailureIn, "plaintext_hashtree")
+ d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
return d
def test_bad_crypttext_hashes(self):
modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
[(i, "good") for i in range(1, 10)])
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
- d.addCallback(self.assertFetchFailureIn, "crypttext_hashtree")
+ d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
return d
def test_bad_crypttext_hashes_failure(self):
d.addBoth(_done)
return d
- def test_omitted_crypttext_hashes_failure(self):
- # Test that the downloader requires a Merkle Tree over the
- # ciphertext (per http://crisp.cs.du.edu/?q=node/88 -- the
- # problem that checking the integrity of the shares could let
- # more than one immutable file match the same read-cap).
- modemap = dict([(i, "good") for i in range(0, 10)])
- d = self.send_and_recover((4,8,10), bucket_modes=modemap,
- recover_mode=("omit_crypttext_root_hash"))
- def _done(res):
- self.failUnless(isinstance(res, Failure))
- self.failUnless(res.check(download.BadURIExtension), res)
- d.addBoth(_done)
- return d
-
def OFF_test_bad_plaintext(self):
# faking a decryption failure is easier: just corrupt the key
modemap = dict([(i, "good") for i in range(0, 10)])
self.failUnless(res.check(NotEnoughSharesError))
d.addBoth(_done)
return d
-