From: Brian Warner Date: Wed, 4 Aug 2010 07:26:39 +0000 (-0700) Subject: Rewrite immutable downloader (#798). This patch rearranges the rest of src/allmydata... X-Git-Tag: allmydata-tahoe-1.8.0b2~20 X-Git-Url: https://git.rkrishnan.org/pf/content/en/seg/class-simplejson.JSONDecoder.html?a=commitdiff_plain;h=797828f47fe1aa4490cb622552975afad7809f03;p=tahoe-lafs%2Ftahoe-lafs.git Rewrite immutable downloader (#798). This patch rearranges the rest of src/allmydata/immutable/ . --- diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py index 2f2d8f12..cd5c5568 100644 --- a/src/allmydata/immutable/checker.py +++ b/src/allmydata/immutable/checker.py @@ -1,16 +1,444 @@ +from zope.interface import implements +from twisted.internet import defer from foolscap.api import DeadReferenceError, RemoteException +from allmydata import hashtree, codec, uri +from allmydata.interfaces import IValidatedThingProxy, IVerifierURI from allmydata.hashtree import IncompleteHashTree from allmydata.check_results import CheckResults -from allmydata.immutable import download from allmydata.uri import CHKFileVerifierURI from allmydata.util.assertutil import precondition -from allmydata.util import base32, idlib, deferredutil, dictutil, log +from allmydata.util import base32, idlib, deferredutil, dictutil, log, mathutil from allmydata.util.hashutil import file_renewal_secret_hash, \ file_cancel_secret_hash, bucket_renewal_secret_hash, \ - bucket_cancel_secret_hash + bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \ + block_hash from allmydata.immutable import layout +class IntegrityCheckReject(Exception): + pass +class BadURIExtension(IntegrityCheckReject): + pass +class BadURIExtensionHashValue(IntegrityCheckReject): + pass +class BadOrMissingHash(IntegrityCheckReject): + pass +class UnsupportedErasureCodec(BadURIExtension): + pass + +class ValidatedExtendedURIProxy: + implements(IValidatedThingProxy) + """ I am a front-end for a remote UEB (using a local ReadBucketProxy), + responsible for retrieving and validating the elements from the UEB.""" + + def __init__(self, readbucketproxy, verifycap, fetch_failures=None): + # fetch_failures is for debugging -- see test_encode.py + self._fetch_failures = fetch_failures + self._readbucketproxy = readbucketproxy + precondition(IVerifierURI.providedBy(verifycap), verifycap) + self._verifycap = verifycap + + # required + self.segment_size = None + self.crypttext_root_hash = None + self.share_root_hash = None + + # computed + self.block_size = None + self.share_size = None + self.num_segments = None + self.tail_data_size = None + self.tail_segment_size = None + + # optional + self.crypttext_hash = None + + def __str__(self): + return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string()) + + def _check_integrity(self, data): + h = uri_extension_hash(data) + if h != self._verifycap.uri_extension_hash: + msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" % + (self._readbucketproxy, + base32.b2a(self._verifycap.uri_extension_hash), + base32.b2a(h))) + if self._fetch_failures is not None: + self._fetch_failures["uri_extension"] += 1 + raise BadURIExtensionHashValue(msg) + else: + return data + + def _parse_and_validate(self, data): + self.share_size = mathutil.div_ceil(self._verifycap.size, + self._verifycap.needed_shares) + + d = uri.unpack_extension(data) + + # There are several kinds of things that can be found in a UEB. + # First, things that we really need to learn from the UEB in order to + # do this download. Next: things which are optional but not redundant + # -- if they are present in the UEB they will get used. Next, things + # that are optional and redundant. These things are required to be + # consistent: they don't have to be in the UEB, but if they are in + # the UEB then they will be checked for consistency with the + # already-known facts, and if they are inconsistent then an exception + # will be raised. These things aren't actually used -- they are just + # tested for consistency and ignored. Finally: things which are + # deprecated -- they ought not be in the UEB at all, and if they are + # present then a warning will be logged but they are otherwise + # ignored. + + # First, things that we really need to learn from the UEB: + # segment_size, crypttext_root_hash, and share_root_hash. + self.segment_size = d['segment_size'] + + self.block_size = mathutil.div_ceil(self.segment_size, + self._verifycap.needed_shares) + self.num_segments = mathutil.div_ceil(self._verifycap.size, + self.segment_size) + + self.tail_data_size = self._verifycap.size % self.segment_size + if not self.tail_data_size: + self.tail_data_size = self.segment_size + # padding for erasure code + self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, + self._verifycap.needed_shares) + + # Ciphertext hash tree root is mandatory, so that there is at most + # one ciphertext that matches this read-cap or verify-cap. The + # integrity check on the shares is not sufficient to prevent the + # original encoder from creating some shares of file A and other + # shares of file B. + self.crypttext_root_hash = d['crypttext_root_hash'] + + self.share_root_hash = d['share_root_hash'] + + + # Next: things that are optional and not redundant: crypttext_hash + if d.has_key('crypttext_hash'): + self.crypttext_hash = d['crypttext_hash'] + if len(self.crypttext_hash) != CRYPTO_VAL_SIZE: + raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),)) + + + # Next: things that are optional, redundant, and required to be + # consistent: codec_name, codec_params, tail_codec_params, + # num_segments, size, needed_shares, total_shares + if d.has_key('codec_name'): + if d['codec_name'] != "crs": + raise UnsupportedErasureCodec(d['codec_name']) + + if d.has_key('codec_params'): + ucpss, ucpns, ucpts = codec.parse_params(d['codec_params']) + if ucpss != self.segment_size: + raise BadURIExtension("inconsistent erasure code params: " + "ucpss: %s != self.segment_size: %s" % + (ucpss, self.segment_size)) + if ucpns != self._verifycap.needed_shares: + raise BadURIExtension("inconsistent erasure code params: ucpns: %s != " + "self._verifycap.needed_shares: %s" % + (ucpns, self._verifycap.needed_shares)) + if ucpts != self._verifycap.total_shares: + raise BadURIExtension("inconsistent erasure code params: ucpts: %s != " + "self._verifycap.total_shares: %s" % + (ucpts, self._verifycap.total_shares)) + + if d.has_key('tail_codec_params'): + utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params']) + if utcpss != self.tail_segment_size: + raise BadURIExtension("inconsistent erasure code params: utcpss: %s != " + "self.tail_segment_size: %s, self._verifycap.size: %s, " + "self.segment_size: %s, self._verifycap.needed_shares: %s" + % (utcpss, self.tail_segment_size, self._verifycap.size, + self.segment_size, self._verifycap.needed_shares)) + if utcpns != self._verifycap.needed_shares: + raise BadURIExtension("inconsistent erasure code params: utcpns: %s != " + "self._verifycap.needed_shares: %s" % (utcpns, + self._verifycap.needed_shares)) + if utcpts != self._verifycap.total_shares: + raise BadURIExtension("inconsistent erasure code params: utcpts: %s != " + "self._verifycap.total_shares: %s" % (utcpts, + self._verifycap.total_shares)) + + if d.has_key('num_segments'): + if d['num_segments'] != self.num_segments: + raise BadURIExtension("inconsistent num_segments: size: %s, " + "segment_size: %s, computed_num_segments: %s, " + "ueb_num_segments: %s" % (self._verifycap.size, + self.segment_size, + self.num_segments, d['num_segments'])) + + if d.has_key('size'): + if d['size'] != self._verifycap.size: + raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" % + (self._verifycap.size, d['size'])) + + if d.has_key('needed_shares'): + if d['needed_shares'] != self._verifycap.needed_shares: + raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB " + "needed shares: %s" % (self._verifycap.total_shares, + d['needed_shares'])) + + if d.has_key('total_shares'): + if d['total_shares'] != self._verifycap.total_shares: + raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB " + "total shares: %s" % (self._verifycap.total_shares, + d['total_shares'])) + + # Finally, things that are deprecated and ignored: plaintext_hash, + # plaintext_root_hash + if d.get('plaintext_hash'): + log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons " + "and is no longer used. Ignoring. %s" % (self,)) + if d.get('plaintext_root_hash'): + log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security " + "reasons and is no longer used. Ignoring. %s" % (self,)) + + return self + + def start(self): + """Fetch the UEB from bucket, compare its hash to the hash from + verifycap, then parse it. Returns a deferred which is called back + with self once the fetch is successful, or is erred back if it + fails.""" + d = self._readbucketproxy.get_uri_extension() + d.addCallback(self._check_integrity) + d.addCallback(self._parse_and_validate) + return d + +class ValidatedReadBucketProxy(log.PrefixingLogMixin): + """I am a front-end for a remote storage bucket, responsible for + retrieving and validating data from that bucket. + + My get_block() method is used by BlockDownloaders. + """ + + def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, + block_size, share_size): + """ share_hash_tree is required to have already been initialized with + the root hash (the number-0 hash), using the share_root_hash from the + UEB""" + precondition(share_hash_tree[0] is not None, share_hash_tree) + prefix = "%d-%s-%s" % (sharenum, bucket, + base32.b2a_l(share_hash_tree[0][:8], 60)) + log.PrefixingLogMixin.__init__(self, + facility="tahoe.immutable.download", + prefix=prefix) + self.sharenum = sharenum + self.bucket = bucket + self.share_hash_tree = share_hash_tree + self.num_blocks = num_blocks + self.block_size = block_size + self.share_size = share_size + self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks) + + def get_all_sharehashes(self): + """Retrieve and validate all the share-hash-tree nodes that are + included in this share, regardless of whether we need them to + validate the share or not. Each share contains a minimal Merkle tree + chain, but there is lots of overlap, so usually we'll be using hashes + from other shares and not reading every single hash from this share. + The Verifier uses this function to read and validate every single + hash from this share. + + Call this (and wait for the Deferred it returns to fire) before + calling get_block() for the first time: this lets us check that the + share share contains enough hashes to validate its own data, and + avoids downloading any share hash twice. + + I return a Deferred which errbacks upon failure, probably with + BadOrMissingHash.""" + + d = self.bucket.get_share_hashes() + def _got_share_hashes(sh): + sharehashes = dict(sh) + try: + self.share_hash_tree.set_hashes(sharehashes) + except IndexError, le: + raise BadOrMissingHash(le) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + raise BadOrMissingHash(le) + d.addCallback(_got_share_hashes) + return d + + def get_all_blockhashes(self): + """Retrieve and validate all the block-hash-tree nodes that are + included in this share. Each share contains a full Merkle tree, but + we usually only fetch the minimal subset necessary for any particular + block. This function fetches everything at once. The Verifier uses + this function to validate the block hash tree. + + Call this (and wait for the Deferred it returns to fire) after + calling get_all_sharehashes() and before calling get_block() for the + first time: this lets us check that the share contains all block + hashes and avoids downloading them multiple times. + + I return a Deferred which errbacks upon failure, probably with + BadOrMissingHash. + """ + + # get_block_hashes(anything) currently always returns everything + needed = list(range(len(self.block_hash_tree))) + d = self.bucket.get_block_hashes(needed) + def _got_block_hashes(blockhashes): + if len(blockhashes) < len(self.block_hash_tree): + raise BadOrMissingHash() + bh = dict(enumerate(blockhashes)) + + try: + self.block_hash_tree.set_hashes(bh) + except IndexError, le: + raise BadOrMissingHash(le) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + raise BadOrMissingHash(le) + d.addCallback(_got_block_hashes) + return d + + def get_all_crypttext_hashes(self, crypttext_hash_tree): + """Retrieve and validate all the crypttext-hash-tree nodes that are + in this share. Normally we don't look at these at all: the download + process fetches them incrementally as needed to validate each segment + of ciphertext. But this is a convenient place to give the Verifier a + function to validate all of these at once. + + Call this with a new hashtree object for each share, initialized with + the crypttext hash tree root. I return a Deferred which errbacks upon + failure, probably with BadOrMissingHash. + """ + + # get_crypttext_hashes() always returns everything + d = self.bucket.get_crypttext_hashes() + def _got_crypttext_hashes(hashes): + if len(hashes) < len(crypttext_hash_tree): + raise BadOrMissingHash() + ct_hashes = dict(enumerate(hashes)) + try: + crypttext_hash_tree.set_hashes(ct_hashes) + except IndexError, le: + raise BadOrMissingHash(le) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + raise BadOrMissingHash(le) + d.addCallback(_got_crypttext_hashes) + return d + + def get_block(self, blocknum): + # the first time we use this bucket, we need to fetch enough elements + # of the share hash tree to validate it from our share hash up to the + # hashroot. + if self.share_hash_tree.needed_hashes(self.sharenum): + d1 = self.bucket.get_share_hashes() + else: + d1 = defer.succeed([]) + + # We might need to grab some elements of our block hash tree, to + # validate the requested block up to the share hash. + blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True) + # We don't need the root of the block hash tree, as that comes in the + # share tree. + blockhashesneeded.discard(0) + d2 = self.bucket.get_block_hashes(blockhashesneeded) + + if blocknum < self.num_blocks-1: + thisblocksize = self.block_size + else: + thisblocksize = self.share_size % self.block_size + if thisblocksize == 0: + thisblocksize = self.block_size + d3 = self.bucket.get_block_data(blocknum, + self.block_size, thisblocksize) + + dl = deferredutil.gatherResults([d1, d2, d3]) + dl.addCallback(self._got_data, blocknum) + return dl + + def _got_data(self, results, blocknum): + precondition(blocknum < self.num_blocks, + self, blocknum, self.num_blocks) + sharehashes, blockhashes, blockdata = results + try: + sharehashes = dict(sharehashes) + except ValueError, le: + le.args = tuple(le.args + (sharehashes,)) + raise + blockhashes = dict(enumerate(blockhashes)) + + candidate_share_hash = None # in case we log it in the except block below + blockhash = None # in case we log it in the except block below + + try: + if self.share_hash_tree.needed_hashes(self.sharenum): + # This will raise exception if the values being passed do not + # match the root node of self.share_hash_tree. + try: + self.share_hash_tree.set_hashes(sharehashes) + except IndexError, le: + # Weird -- sharehashes contained index numbers outside of + # the range that fit into this hash tree. + raise BadOrMissingHash(le) + + # To validate a block we need the root of the block hash tree, + # which is also one of the leafs of the share hash tree, and is + # called "the share hash". + if not self.block_hash_tree[0]: # empty -- no root node yet + # Get the share hash from the share hash tree. + share_hash = self.share_hash_tree.get_leaf(self.sharenum) + if not share_hash: + # No root node in block_hash_tree and also the share hash + # wasn't sent by the server. + raise hashtree.NotEnoughHashesError + self.block_hash_tree.set_hashes({0: share_hash}) + + if self.block_hash_tree.needed_hashes(blocknum): + self.block_hash_tree.set_hashes(blockhashes) + + blockhash = block_hash(blockdata) + self.block_hash_tree.set_hashes(leaves={blocknum: blockhash}) + #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d " + # "%r .. %r: %s" % + # (self.sharenum, blocknum, len(blockdata), + # blockdata[:50], blockdata[-50:], base32.b2a(blockhash))) + + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + # log.WEIRD: indicates undetected disk/network error, or more + # likely a programming error + self.log("hash failure in block=%d, shnum=%d on %s" % + (blocknum, self.sharenum, self.bucket)) + if self.block_hash_tree.needed_hashes(blocknum): + self.log(""" failure occurred when checking the block_hash_tree. + This suggests that either the block data was bad, or that the + block hashes we received along with it were bad.""") + else: + self.log(""" the failure probably occurred when checking the + share_hash_tree, which suggests that the share hashes we + received from the remote peer were bad.""") + self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash)) + self.log(" block length: %d" % len(blockdata)) + self.log(" block hash: %s" % base32.b2a_or_none(blockhash)) + if len(blockdata) < 100: + self.log(" block data: %r" % (blockdata,)) + else: + self.log(" block data start/end: %r .. %r" % + (blockdata[:50], blockdata[-50:])) + self.log(" share hash tree:\n" + self.share_hash_tree.dump()) + self.log(" block hash tree:\n" + self.block_hash_tree.dump()) + lines = [] + for i,h in sorted(sharehashes.items()): + lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) + self.log(" sharehashes:\n" + "\n".join(lines) + "\n") + lines = [] + for i,h in blockhashes.items(): + lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) + log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") + raise BadOrMissingHash(le) + + # If we made it here, the block is good. If the hash trees didn't + # like what they saw, they would have raised a BadHashError, causing + # our caller to see a Failure and thus ignore this block (as well as + # dropping this bucket). + return blockdata + + class Checker(log.PrefixingLogMixin): """I query all servers to see if M uniquely-numbered shares are available. @@ -85,7 +513,9 @@ class Checker(log.PrefixingLogMixin): level = log.WEIRD if f.check(DeadReferenceError): level = log.UNUSUAL - self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ") + self.log("failure from server on 'get_buckets' the REMOTE failure was:", + facility="tahoe.immutable.checker", + failure=f, level=level, umid="AX7wZQ") return ({}, serverid, False) d.addCallbacks(_wrap_results, _trap_errs) @@ -146,18 +576,18 @@ class Checker(log.PrefixingLogMixin): vcap = self._verifycap b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index()) - veup = download.ValidatedExtendedURIProxy(b, vcap) + veup = ValidatedExtendedURIProxy(b, vcap) d = veup.start() def _got_ueb(vup): share_hash_tree = IncompleteHashTree(vcap.total_shares) share_hash_tree.set_hashes({0: vup.share_root_hash}) - vrbp = download.ValidatedReadBucketProxy(sharenum, b, - share_hash_tree, - vup.num_segments, - vup.block_size, - vup.share_size) + vrbp = ValidatedReadBucketProxy(sharenum, b, + share_hash_tree, + vup.num_segments, + vup.block_size, + vup.share_size) # note: normal download doesn't use get_all_sharehashes(), # because it gets more data than necessary. We've discussed the @@ -216,8 +646,8 @@ class Checker(log.PrefixingLogMixin): return (False, sharenum, 'incompatible') elif f.check(layout.LayoutInvalid, layout.RidiculouslyLargeURIExtensionBlock, - download.BadOrMissingHash, - download.BadURIExtensionHashValue): + BadOrMissingHash, + BadURIExtensionHashValue): return (False, sharenum, 'corrupt') # if it wasn't one of those reasons, re-raise the error diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index 70044a7d..1d5be948 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -1,234 +1,81 @@ -import copy, os.path, stat -from cStringIO import StringIO + +import binascii +import copy +import time +now = time.time from zope.interface import implements from twisted.internet import defer -from twisted.internet.interfaces import IPushProducer -from twisted.protocols import basic -from foolscap.api import eventually -from allmydata.interfaces import IImmutableFileNode, ICheckable, \ - IDownloadTarget, IUploadResults -from allmydata.util import dictutil, log, base32 -from allmydata.uri import CHKFileURI, LiteralFileURI -from allmydata.immutable.checker import Checker -from allmydata.check_results import CheckResults, CheckAndRepairResults -from allmydata.immutable.repairer import Repairer -from allmydata.immutable import download - -class _ImmutableFileNodeBase(object): - implements(IImmutableFileNode, ICheckable) - - def get_write_uri(self): - return None - - def get_readonly_uri(self): - return self.get_uri() - - def is_mutable(self): - return False - - def is_readonly(self): - return True - - def is_unknown(self): - return False - - def is_allowed_in_immutable_directory(self): - return True - - def raise_error(self): - pass - - def __hash__(self): - return self.u.__hash__() - def __eq__(self, other): - if isinstance(other, _ImmutableFileNodeBase): - return self.u.__eq__(other.u) - else: - return False - def __ne__(self, other): - if isinstance(other, _ImmutableFileNodeBase): - return self.u.__eq__(other.u) - else: - return True - -class PortionOfFile: - # like a list slice (things[2:14]), but for a file on disk - def __init__(self, fn, offset=0, size=None): - self.f = open(fn, "rb") - self.f.seek(offset) - self.bytes_left = size - - def read(self, size=None): - # bytes_to_read = min(size, self.bytes_left), but None>anything - if size is None: - bytes_to_read = self.bytes_left - elif self.bytes_left is None: - bytes_to_read = size - else: - bytes_to_read = min(size, self.bytes_left) - data = self.f.read(bytes_to_read) - if self.bytes_left is not None: - self.bytes_left -= len(data) - return data - -class DownloadCache: - implements(IDownloadTarget) - - def __init__(self, filecap, storage_index, downloader, - cachedirectorymanager): - self._downloader = downloader - self._uri = filecap - self._storage_index = storage_index - self.milestones = set() # of (offset,size,Deferred) - self.cachedirectorymanager = cachedirectorymanager - self.cachefile = None - self.download_in_progress = False - # five states: - # new ImmutableFileNode, no downloads ever performed - # new ImmutableFileNode, leftover file (partial) - # new ImmutableFileNode, leftover file (whole) - # download in progress, not yet complete - # download complete - - def when_range_available(self, offset, size): - assert isinstance(offset, (int,long)) - assert isinstance(size, (int,long)) - - d = defer.Deferred() - self.milestones.add( (offset,size,d) ) - self._check_milestones() - if self.milestones and not self.download_in_progress: - self.download_in_progress = True - log.msg(format=("immutable filenode read [%(si)s]: " + - "starting download"), - si=base32.b2a(self._storage_index), - umid="h26Heg", level=log.OPERATIONAL) - d2 = self._downloader.download(self._uri, self) - d2.addBoth(self._download_done) - d2.addErrback(self._download_failed) - d2.addErrback(log.err, umid="cQaM9g") - return d - - def read(self, consumer, offset, size): - assert offset+size <= self.get_filesize() - if not self.cachefile: - self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index)) - f = PortionOfFile(self.cachefile.get_filename(), offset, size) - d = basic.FileSender().beginFileTransfer(f, consumer) - d.addCallback(lambda lastSent: consumer) - return d - - def _download_done(self, res): - # clear download_in_progress, so failed downloads can be re-tried - self.download_in_progress = False - return res - - def _download_failed(self, f): - # tell anyone who's waiting that we failed - for m in self.milestones: - (offset,size,d) = m - eventually(d.errback, f) - self.milestones.clear() - - def _check_milestones(self): - current_size = self.get_filesize() - for m in list(self.milestones): - (offset,size,d) = m - if offset+size <= current_size: - log.msg(format=("immutable filenode read [%(si)s] " + - "%(offset)d+%(size)d vs %(filesize)d: " + - "done"), - si=base32.b2a(self._storage_index), - offset=offset, size=size, filesize=current_size, - umid="nuedUg", level=log.NOISY) - self.milestones.discard(m) - eventually(d.callback, None) - else: - log.msg(format=("immutable filenode read [%(si)s] " + - "%(offset)d+%(size)d vs %(filesize)d: " + - "still waiting"), - si=base32.b2a(self._storage_index), - offset=offset, size=size, filesize=current_size, - umid="8PKOhg", level=log.NOISY) - - def get_filesize(self): - if not self.cachefile: - self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index)) - try: - filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE] - except OSError: - filesize = 0 - return filesize - - - def open(self, size): - if not self.cachefile: - self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index)) - self.f = open(self.cachefile.get_filename(), "wb") - - def write(self, data): - self.f.write(data) - self._check_milestones() - - def close(self): - self.f.close() - self._check_milestones() - - def fail(self, why): - pass - def register_canceller(self, cb): - pass - def finish(self): - return None - # The following methods are just because the target might be a - # repairer.DownUpConnector, and just because the current CHKUpload object - # expects to find the storage index and encoding parameters in its - # Uploadable. - def set_storageindex(self, storageindex): - pass - def set_encodingparams(self, encodingparams): - pass +from twisted.internet.interfaces import IConsumer +from allmydata.interfaces import IImmutableFileNode, IUploadResults +from allmydata import uri +from allmydata.check_results import CheckResults, CheckAndRepairResults +from allmydata.util.dictutil import DictOfSets +from pycryptopp.cipher.aes import AES -class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): - def __init__(self, filecap, storage_broker, secret_holder, - downloader, history, cachedirectorymanager): - assert isinstance(filecap, CHKFileURI) - self.u = filecap +# local imports +from allmydata.immutable.checker import Checker +from allmydata.immutable.repairer import Repairer +from allmydata.immutable.downloader.node import DownloadNode +from allmydata.immutable.downloader.status import DownloadStatus + +class CiphertextFileNode: + def __init__(self, verifycap, storage_broker, secret_holder, + terminator, history, download_status=None): + assert isinstance(verifycap, uri.CHKFileVerifierURI) + self._verifycap = verifycap self._storage_broker = storage_broker self._secret_holder = secret_holder - self._downloader = downloader - self._history = history - storage_index = self.get_storage_index() - self.download_cache = DownloadCache(filecap, storage_index, downloader, - cachedirectorymanager) - prefix = self.u.get_verify_cap().to_string() - log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix) - self.log("starting", level=log.OPERATIONAL) + if download_status is None: + ds = DownloadStatus(verifycap.storage_index, verifycap.size) + if history: + history.add_download(ds) + download_status = ds + self._node = DownloadNode(verifycap, storage_broker, secret_holder, + terminator, history, download_status) + + def read(self, consumer, offset=0, size=None, read_ev=None): + """I am the main entry point, from which FileNode.read() can get + data. I feed the consumer with the desired range of ciphertext. I + return a Deferred that fires (with the consumer) when the read is + finished.""" + return self._node.read(consumer, offset, size, read_ev) + + def get_segment(self, segnum): + """Begin downloading a segment. I return a tuple (d, c): 'd' is a + Deferred that fires with (offset,data) when the desired segment is + available, and c is an object on which c.cancel() can be called to + disavow interest in the segment (after which 'd' will never fire). + + You probably need to know the segment size before calling this, + unless you want the first few bytes of the file. If you ask for a + segment number which turns out to be too large, the Deferred will + errback with BadSegmentNumberError. + + The Deferred fires with the offset of the first byte of the data + segment, so that you can call get_segment() before knowing the + segment size, and still know which data you received. + """ + return self._node.get_segment(segnum) + + def get_segment_size(self): + # return a Deferred that fires with the file's real segment size + return self._node.get_segsize() - def get_size(self): - return self.u.get_size() - def get_current_size(self): - return defer.succeed(self.get_size()) - - def get_cap(self): - return self.u - def get_readcap(self): - return self.u.get_readonly() + def get_storage_index(self): + return self._verifycap.storage_index def get_verify_cap(self): - return self.u.get_verify_cap() - def get_repair_cap(self): - # CHK files can be repaired with just the verifycap - return self.u.get_verify_cap() + return self._verifycap + def get_size(self): + return self._verifycap.size - def get_uri(self): - return self.u.to_string() + def raise_error(self): + pass - def get_storage_index(self): - return self.u.get_storage_index() def check_and_repair(self, monitor, verify=False, add_lease=False): - verifycap = self.get_verify_cap() + verifycap = self._verifycap + storage_index = verifycap.storage_index sb = self._storage_broker servers = sb.get_all_servers() sh = self._secret_holder @@ -238,7 +85,7 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): monitor=monitor) d = c.start() def _maybe_repair(cr): - crr = CheckAndRepairResults(self.u.get_storage_index()) + crr = CheckAndRepairResults(storage_index) crr.pre_repair_results = cr if cr.is_healthy(): crr.post_repair_results = cr @@ -248,24 +95,25 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): crr.repair_successful = False # until proven successful def _gather_repair_results(ur): assert IUploadResults.providedBy(ur), ur - # clone the cr -- check results to form the basic of the prr -- post-repair results + # clone the cr (check results) to form the basis of the + # prr (post-repair results) prr = CheckResults(cr.uri, cr.storage_index) prr.data = copy.deepcopy(cr.data) sm = prr.data['sharemap'] - assert isinstance(sm, dictutil.DictOfSets), sm + assert isinstance(sm, DictOfSets), sm sm.update(ur.sharemap) servers_responding = set(prr.data['servers-responding']) servers_responding.union(ur.sharemap.iterkeys()) prr.data['servers-responding'] = list(servers_responding) prr.data['count-shares-good'] = len(sm) prr.data['count-good-share-hosts'] = len(sm) - is_healthy = bool(len(sm) >= self.u.total_shares) - is_recoverable = bool(len(sm) >= self.u.needed_shares) + is_healthy = bool(len(sm) >= verifycap.total_shares) + is_recoverable = bool(len(sm) >= verifycap.needed_shares) prr.set_healthy(is_healthy) prr.set_recoverable(is_recoverable) crr.repair_successful = is_healthy - prr.set_needs_rebalancing(len(sm) >= self.u.total_shares) + prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares) crr.post_repair_results = prr return crr @@ -275,8 +123,8 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): crr.repair_successful = False crr.repair_failure = f return f - r = Repairer(storage_broker=sb, secret_holder=sh, - verifycap=verifycap, monitor=monitor) + r = Repairer(self, storage_broker=sb, secret_holder=sh, + monitor=monitor) d = r.start() d.addCallbacks(_gather_repair_results, _repair_error) return d @@ -285,7 +133,7 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): return d def check(self, monitor, verify=False, add_lease=False): - verifycap = self.get_verify_cap() + verifycap = self._verifycap sb = self._storage_broker servers = sb.get_all_servers() sh = self._secret_holder @@ -295,81 +143,130 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): monitor=monitor) return v.start() + +class DecryptingConsumer: + """I sit between a CiphertextDownloader (which acts as a Producer) and + the real Consumer, decrypting everything that passes by. The real + Consumer sees the real Producer, but the Producer sees us instead of the + real consumer.""" + implements(IConsumer) + + def __init__(self, consumer, readkey, offset, read_event): + self._consumer = consumer + self._read_event = read_event + # TODO: pycryptopp CTR-mode needs random-access operations: I want + # either a=AES(readkey, offset) or better yet both of: + # a=AES(readkey, offset=0) + # a.process(ciphertext, offset=xyz) + # For now, we fake it with the existing iv= argument. + offset_big = offset // 16 + offset_small = offset % 16 + iv = binascii.unhexlify("%032x" % offset_big) + self._decryptor = AES(readkey, iv=iv) + self._decryptor.process("\x00"*offset_small) + + def registerProducer(self, producer, streaming): + # this passes through, so the real consumer can flow-control the real + # producer. Therefore we don't need to provide any IPushProducer + # methods. We implement all the IConsumer methods as pass-throughs, + # and only intercept write() to perform decryption. + self._consumer.registerProducer(producer, streaming) + def unregisterProducer(self): + self._consumer.unregisterProducer() + def write(self, ciphertext): + started = now() + plaintext = self._decryptor.process(ciphertext) + elapsed = now() - started + self._read_event.update(0, elapsed, 0) + self._consumer.write(plaintext) + +class ImmutableFileNode: + implements(IImmutableFileNode) + + # I wrap a CiphertextFileNode with a decryption key + def __init__(self, filecap, storage_broker, secret_holder, terminator, + history): + assert isinstance(filecap, uri.CHKFileURI) + verifycap = filecap.get_verify_cap() + ds = DownloadStatus(verifycap.storage_index, verifycap.size) + if history: + history.add_download(ds) + self._download_status = ds + self._cnode = CiphertextFileNode(verifycap, storage_broker, + secret_holder, terminator, history, ds) + assert isinstance(filecap, uri.CHKFileURI) + self.u = filecap + self._readkey = filecap.key + + # TODO: I'm not sure about this.. what's the use case for node==node? If + # we keep it here, we should also put this on CiphertextFileNode + def __hash__(self): + return self.u.__hash__() + def __eq__(self, other): + if isinstance(other, ImmutableFileNode): + return self.u.__eq__(other.u) + else: + return False + def __ne__(self, other): + if isinstance(other, ImmutableFileNode): + return self.u.__eq__(other.u) + else: + return True + def read(self, consumer, offset=0, size=None): - self.log("read", offset=offset, size=size, - umid="UPP8FA", level=log.OPERATIONAL) - if size is None: - size = self.get_size() - offset - size = min(size, self.get_size() - offset) - - if offset == 0 and size == self.get_size(): - # don't use the cache, just do a normal streaming download - self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL) - target = download.ConsumerAdapter(consumer) - return self._downloader.download(self.get_cap(), target, - self._parentmsgid, - history=self._history) - - d = self.download_cache.when_range_available(offset, size) - d.addCallback(lambda res: - self.download_cache.read(consumer, offset, size)) + actual_size = size + if actual_size == None: + actual_size = self.u.size + actual_size = actual_size - offset + read_ev = self._download_status.add_read_event(offset,actual_size, + now()) + decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev) + d = self._cnode.read(decryptor, offset, size, read_ev) + d.addCallback(lambda dc: consumer) return d -class LiteralProducer: - implements(IPushProducer) - def resumeProducing(self): - pass - def stopProducing(self): + def raise_error(self): pass + def get_write_uri(self): + return None -class LiteralFileNode(_ImmutableFileNodeBase): - - def __init__(self, filecap): - assert isinstance(filecap, LiteralFileURI) - self.u = filecap - - def get_size(self): - return len(self.u.data) - def get_current_size(self): - return defer.succeed(self.get_size()) + def get_readonly_uri(self): + return self.get_uri() + def get_uri(self): + return self.u.to_string() def get_cap(self): return self.u def get_readcap(self): - return self.u + return self.u.get_readonly() def get_verify_cap(self): - return None + return self.u.get_verify_cap() def get_repair_cap(self): - return None - - def get_uri(self): - return self.u.to_string() + # CHK files can be repaired with just the verifycap + return self.u.get_verify_cap() def get_storage_index(self): - return None + return self.u.get_storage_index() - def check(self, monitor, verify=False, add_lease=False): - return defer.succeed(None) + def get_size(self): + return self.u.get_size() + def get_current_size(self): + return defer.succeed(self.get_size()) - def check_and_repair(self, monitor, verify=False, add_lease=False): - return defer.succeed(None) + def is_mutable(self): + return False - def read(self, consumer, offset=0, size=None): - if size is None: - data = self.u.data[offset:] - else: - data = self.u.data[offset:offset+size] - - # We use twisted.protocols.basic.FileSender, which only does - # non-streaming, i.e. PullProducer, where the receiver/consumer must - # ask explicitly for each chunk of data. There are only two places in - # the Twisted codebase that can't handle streaming=False, both of - # which are in the upload path for an FTP/SFTP server - # (protocols.ftp.FileConsumer and - # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is - # likely to be used as the target for a Tahoe download. - - d = basic.FileSender().beginFileTransfer(StringIO(data), consumer) - d.addCallback(lambda lastSent: consumer) - return d + def is_readonly(self): + return True + + def is_unknown(self): + return False + + def is_allowed_in_immutable_directory(self): + return True + + def check_and_repair(self, monitor, verify=False, add_lease=False): + return self._cnode.check_and_repair(monitor, verify, add_lease) + def check(self, monitor, verify=False, add_lease=False): + return self._cnode.check(monitor, verify, add_lease) diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index 6e07da7b..27fb8445 100644 --- a/src/allmydata/immutable/layout.py +++ b/src/allmydata/immutable/layout.py @@ -74,12 +74,16 @@ limitations described in #346. # they are still provided when writing so that older versions of Tahoe can # read them. +FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares + def make_write_bucket_proxy(rref, data_size, block_size, num_segments, num_share_hashes, uri_extension_size_max, nodeid): # Use layout v1 for small files, so they'll be readable by older versions # (= self.next_read_lens[0]) - or self._closed_to_pusher): - nrd = self.next_read_ds.popleft() - nrl = self.next_read_lens.popleft() - - # Pick out the requested number of bytes from self.bufs, turn it - # into a string, and callback the deferred with that. - res = [] - ressize = 0 - while ressize < nrl and self.bufs: - nextbuf = self.bufs.popleft() - res.append(nextbuf) - ressize += len(nextbuf) - if ressize > nrl: - extra = ressize - nrl - self.bufs.appendleft(nextbuf[:-extra]) - res[-1] = nextbuf[:-extra] - assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl) - assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl) - self.bufsiz -= nrl - if self.bufsiz < self.buflim and self.producer: - self.producer.resumeProducing() - nrd.callback(res) - - # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From - # the perspective of a downloader I am an IDownloadTarget and an - # IConsumer.) - def registerProducer(self, producer, streaming): - assert streaming # We know how to handle only streaming producers. - self.producer = producer # the downloader - def unregisterProducer(self): - self.producer = None - def open(self, size): - self.size = size - self._size_osol.fire(self.size) - def set_encodingparams(self, encodingparams): - self.encodingparams = encodingparams - self._encodingparams_osol.fire(self.encodingparams) - def set_storageindex(self, storageindex): - self.storageindex = storageindex - self._storageindex_osol.fire(self.storageindex) - def write(self, data): - precondition(data) # please don't write empty strings - self.bufs.append(data) - self.bufsiz += len(data) - self._satisfy_reads_if_possible() - if self.bufsiz >= self.buflim and self.producer: - self.producer.pauseProducing() - def finish(self): - pass - def close(self): - self._closed_to_pusher = True - # Any reads which haven't been satisfied by now are going to - # have to be satisfied with short reads. - self._satisfy_reads_if_possible() # methods to satisfy the IEncryptedUploader interface # (From the perspective of an uploader I am an IEncryptedUploadable.) def set_upload_status(self, upload_status): self.upload_status = upload_status def get_size(self): - if hasattr(self, 'size'): # attribute created by self.open() - return defer.succeed(self.size) - else: - return self._size_osol.when_fired() + size = self._filenode.get_size() + assert size is not None + return defer.succeed(size) def get_all_encoding_parameters(self): - # We have to learn the encoding params from pusher. - if hasattr(self, 'encodingparams'): - # attribute created by self.set_encodingparams() - return defer.succeed(self.encodingparams) - else: - return self._encodingparams_osol.when_fired() + return defer.succeed(self._encodingparams) def read_encrypted(self, length, hash_only): - """Returns a deferred which eventually fired with the requested - ciphertext.""" + """Returns a deferred which eventually fires with the requested + ciphertext, as a list of strings.""" precondition(length) # please don't ask to read 0 bytes - d = defer.Deferred() - self.next_read_ds.append(d) - self.next_read_lens.append(length) - self._satisfy_reads_if_possible() + mc = consumer.MemoryConsumer() + d = self._filenode.read(mc, self._offset, length) + self._offset += length + d.addCallback(lambda ign: mc.chunks) return d def get_storage_index(self): - # We have to learn the storage index from pusher. - if hasattr(self, 'storageindex'): - # attribute created by self.set_storageindex() - return defer.succeed(self.storageindex) - else: - return self._storageindex.when_fired() + return self._filenode.get_storage_index() + def close(self): + pass diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index f0d3b0bb..5dd257bd 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -20,7 +20,8 @@ from allmydata.util.assertutil import precondition from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \ - NoServersError, InsufficientVersionError, UploadUnhappinessError + NoServersError, InsufficientVersionError, UploadUnhappinessError, \ + DEFAULT_MAX_SEGMENT_SIZE from allmydata.immutable import layout from pycryptopp.cipher.aes import AES @@ -1234,7 +1235,8 @@ class AssistedUploader: return self._upload_status class BaseUploadable: - default_max_segment_size = 128*KiB # overridden by max_segment_size + # this is overridden by max_segment_size + default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE default_encoding_param_k = 3 # overridden by encoding_parameters default_encoding_param_happy = 7 default_encoding_param_n = 10