From: Zooko O'Whielacronx Date: Mon, 5 Jan 2009 16:51:45 +0000 (-0700) Subject: immutable: refactor downloader to be more reusable for checker/verifier/repairer... X-Git-Url: https://git.rkrishnan.org/simplejson/components/com_hotproperty/somewhere?a=commitdiff_plain;h=778167c2b1926063149e27edc08b4df5e83f4d2a;p=tahoe-lafs%2Ftahoe-lafs.git immutable: refactor downloader to be more reusable for checker/verifier/repairer (and better) The code for validating the share hash tree and the block hash tree has been rewritten to make sure it handles all cases, to share metadata about the file (such as the share hash tree, block hash trees, and UEB) among different share downloads, and not to require hashes to be stored on the server unnecessarily, such as the roots of the block hash trees (not needed since they are also the leaves of the share hash tree), and the root of the share hash tree (not needed since it is also included in the UEB). It also passes the latest tests including handling corrupted shares well. ValidatedReadBucketProxy takes a share_hash_tree argument to its constructor, which is a reference to a share hash tree shared by all ValidatedReadBucketProxies for that immutable file download. ValidatedReadBucketProxy requires the block_size and share_size to be provided in its constructor, and it then uses those to compute the offsets and lengths of blocks when it needs them, instead of reading those values out of the share. The user of ValidatedReadBucketProxy therefore has to have first used a ValidatedExtendedURIProxy to compute those two values from the validated contents of the URI. This is pleasingly simplifies safety analysis: the client knows which span of bytes corresponds to a given block from the validated URI data, rather than from the unvalidated data stored on the storage server. It also simplifies unit testing of verifier/repairer, because now it doesn't care about the contents of the "share size" and "block size" fields in the share. It does not relieve the need for share data v2 layout, because we still need to store and retrieve the offsets of the fields which come after the share data, therefore we still need to use share data v2 with its 8-byte fields if we want to store share data larger than about 2^32. Specify which subset of the block hashes and share hashes you need while downloading a particular share. In the future this will hopefully be used to fetch only a subset, for network efficiency, but currently all of them are fetched, regardless of which subset you specify. ReadBucketProxy hides the question of whether it has "started" or not (sent a request to the server to get metadata) from its user. Download is optimized to do as few roundtrips and as few requests as possible, hopefully speeding up download a bit. --- diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index 4e84bdc5..595818ec 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -7,7 +7,7 @@ from twisted.application import service from foolscap import DeadReferenceError from foolscap.eventual import eventually -from allmydata.util import base32, mathutil, hashutil, log +from allmydata.util import base32, deferredutil, mathutil, hashutil, log from allmydata.util.assertutil import _assert, precondition from allmydata.util.rrefutil import ServerFailure from allmydata import codec, hashtree, uri @@ -134,7 +134,8 @@ class ValidatedThingObtainer: if not self._validatedthingproxies: raise NotEnoughSharesError("ran out of peers, last error was %s" % (f,)) # try again with a different one - return self._try_the_next_one() + d = self._try_the_next_one() + return d def _try_the_next_one(self): vtp = self._validatedthingproxies.pop(0) @@ -167,8 +168,7 @@ class ValidatedCrypttextHashTreeProxy: return self def start(self): - d = self._readbucketproxy.startIfNecessary() - d.addCallback(lambda ignored: self._readbucketproxy.get_crypttext_hashes()) + d = self._readbucketproxy.get_crypttext_hashes() d.addCallback(self._validate) return d @@ -333,97 +333,104 @@ class ValidatedExtendedURIProxy: """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse it. Returns a deferred which is called back with self once the fetch is successful, or is erred back if it fails. """ - d = self._readbucketproxy.startIfNecessary() - d.addCallback(lambda ignored: self._readbucketproxy.get_uri_extension()) + d = self._readbucketproxy.get_uri_extension() d.addCallback(self._check_integrity) d.addCallback(self._parse_and_validate) return d class ValidatedReadBucketProxy(log.PrefixingLogMixin): - """I am a front-end for a remote storage bucket, responsible for - retrieving and validating data from that bucket. + """I am a front-end for a remote storage bucket, responsible for retrieving and validating + data from that bucket. My get_block() method is used by BlockDownloaders. """ - def __init__(self, sharenum, bucket, - share_hash_tree, share_root_hash, - num_blocks): - """ share_root_hash is the root of the share hash tree; share_root_hash is stored in the UEB """ + def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size): + """ share_hash_tree is required to have already been initialized with the root hash + (the number-0 hash), using the share_root_hash from the UEB """ + precondition(share_hash_tree[0] is not None, share_hash_tree) prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60)) log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) self.sharenum = sharenum self.bucket = bucket - self._share_hash = None # None means not validated yet self.share_hash_tree = share_hash_tree - self._share_root_hash = share_root_hash - self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks) - self.started = False + self.num_blocks = num_blocks + self.block_size = block_size + self.share_size = share_size + self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks) def get_block(self, blocknum): - if not self.started: - d = self.bucket.start() - def _started(res): - self.started = True - return self.get_block(blocknum) - d.addCallback(_started) - return d - # the first time we use this bucket, we need to fetch enough elements # of the share hash tree to validate it from our share hash up to the # hashroot. - if not self._share_hash: + if self.share_hash_tree.needed_hashes(self.sharenum): d1 = self.bucket.get_share_hashes() else: d1 = defer.succeed([]) - # we might need to grab some elements of our block hash tree, to - # validate the requested block up to the share hash - needed = self.block_hash_tree.needed_hashes(blocknum) - if needed: - # TODO: get fewer hashes, use get_block_hashes(needed) - d2 = self.bucket.get_block_hashes() - else: - d2 = defer.succeed([]) - - d3 = self.bucket.get_block(blocknum) + # We might need to grab some elements of our block hash tree, to + # validate the requested block up to the share hash. + blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True) + # We don't need the root of the block hash tree, as that comes in the share tree. + blockhashesneeded.discard(0) + d2 = self.bucket.get_block_hashes(blockhashesneeded) - d = defer.gatherResults([d1, d2, d3]) - d.addCallback(self._got_data, blocknum) - return d + if blocknum < self.num_blocks-1: + thisblocksize = self.block_size + else: + thisblocksize = self.share_size % self.block_size + if thisblocksize == 0: + thisblocksize = self.block_size + d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize) + + dl = deferredutil.gatherResults([d1, d2, d3]) + dl.addCallback(self._got_data, blocknum) + return dl + + def _got_data(self, results, blocknum): + precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks) + sharehashes, blockhashes, blockdata = results + try: + sharehashes = dict(sharehashes) + except ValueError, le: + le.args = tuple(le.args + (sharehashes,)) + raise + blockhashes = dict(enumerate(blockhashes)) - def _got_data(self, res, blocknum): - sharehashes, blockhashes, blockdata = res - blockhash = None # to make logging it safe + candidate_share_hash = None # in case we log it in the except block below + blockhash = None # in case we log it in the except block below try: - if not self._share_hash: - sh = dict(sharehashes) - sh[0] = self._share_root_hash # always use our own root, from the URI - sht = self.share_hash_tree - if sht.get_leaf_index(self.sharenum) not in sh: - raise hashtree.NotEnoughHashesError - sht.set_hashes(sh) - self._share_hash = sht.get_leaf(self.sharenum) + if self.share_hash_tree.needed_hashes(self.sharenum): + # This will raise exception if the values being passed do not match the root + # node of self.share_hash_tree. + self.share_hash_tree.set_hashes(sharehashes) + + # To validate a block we need the root of the block hash tree, which is also one of + # the leafs of the share hash tree, and is called "the share hash". + if not self.block_hash_tree[0]: # empty -- no root node yet + # Get the share hash from the share hash tree. + share_hash = self.share_hash_tree.get_leaf(self.sharenum) + if not share_hash: + raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server. + self.block_hash_tree.set_hashes({0: share_hash}) + + if self.block_hash_tree.needed_hashes(blocknum): + self.block_hash_tree.set_hashes(blockhashes) blockhash = hashutil.block_hash(blockdata) + self.block_hash_tree.set_hashes(leaves={blocknum: blockhash}) #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d " # "%r .. %r: %s" % # (self.sharenum, blocknum, len(blockdata), # blockdata[:50], blockdata[-50:], base32.b2a(blockhash))) - # we always validate the blockhash - bh = dict(enumerate(blockhashes)) - # replace blockhash root with validated value - bh[0] = self._share_hash - self.block_hash_tree.set_hashes(bh, {blocknum: blockhash}) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: # log.WEIRD: indicates undetected disk/network error, or more # likely a programming error self.log("hash failure in block=%d, shnum=%d on %s" % (blocknum, self.sharenum, self.bucket)) - if self._share_hash: + if self.block_hash_tree.needed_hashes(blocknum): self.log(""" failure occurred when checking the block_hash_tree. This suggests that either the block data was bad, or that the block hashes we received along with it were bad.""") @@ -431,7 +438,7 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin): self.log(""" the failure probably occurred when checking the share_hash_tree, which suggests that the share hashes we received from the remote peer were bad.""") - self.log(" have self._share_hash: %s" % bool(self._share_hash)) + self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash)) self.log(" block length: %d" % len(blockdata)) self.log(" block hash: %s" % base32.b2a_or_none(blockhash)) if len(blockdata) < 100: @@ -439,15 +446,14 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin): else: self.log(" block data start/end: %r .. %r" % (blockdata[:50], blockdata[-50:])) - self.log(" root hash: %s" % base32.b2a(self._share_root_hash)) self.log(" share hash tree:\n" + self.share_hash_tree.dump()) self.log(" block hash tree:\n" + self.block_hash_tree.dump()) lines = [] - for i,h in sorted(sharehashes): + for i,h in sorted(sharehashes.items()): lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) self.log(" sharehashes:\n" + "\n".join(lines) + "\n") lines = [] - for i,h in enumerate(blockhashes): + for i,h in blockhashes.items(): lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") raise BadOrMissingHash(le) @@ -495,7 +501,7 @@ class BlockDownloader(log.PrefixingLogMixin): self.parent.hold_block(self.blocknum, data) def _got_block_error(self, f): - failtype = f.trap(ServerFailure, IntegrityCheckReject, layout.LayoutInvalid) + failtype = f.trap(ServerFailure, IntegrityCheckReject, layout.LayoutInvalid, layout.ShareVersionIncompatible) if f.check(ServerFailure): level = log.UNUSUAL else: @@ -679,6 +685,7 @@ class FileDownloader(log.PrefixingLogMixin): self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, } + self._share_hash_tree = None self._crypttext_hash_tree = None def pauseProducing(self): @@ -840,8 +847,8 @@ class FileDownloader(log.PrefixingLogMixin): self._current_segnum = 0 - self._share_hashtree = hashtree.IncompleteHashTree(self._uri.total_shares) - self._share_hashtree.set_hashes({0: vup.share_root_hash}) + self._share_hash_tree = hashtree.IncompleteHashTree(self._uri.total_shares) + self._share_hash_tree.set_hashes({0: vup.share_root_hash}) self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments) self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash}) @@ -897,12 +904,8 @@ class FileDownloader(log.PrefixingLogMixin): def _download_all_segments(self, res): for sharenum, bucket in self._share_buckets: - vbucket = ValidatedReadBucketProxy(sharenum, bucket, - self._share_hashtree, - self._vup.share_root_hash, - self._vup.num_segments) - s = self._share_vbuckets.setdefault(sharenum, set()) - s.add(vbucket) + vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) + self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) # after the above code, self._share_vbuckets contains enough # buckets to complete the download, and some extra ones to diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py index 0945ded9..7bc4ce0f 100644 --- a/src/allmydata/immutable/encode.py +++ b/src/allmydata/immutable/encode.py @@ -283,7 +283,7 @@ class Encoder(object): self.set_status("Starting shareholders") dl = [] for shareid in self.landlords: - d = self.landlords[shareid].start() + d = self.landlords[shareid].put_header() d.addErrback(self._remove_shareholder, shareid, "start") dl.append(d) return self._gather_responses(dl) diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index 1f5de5ab..d1c2da45 100644 --- a/src/allmydata/immutable/layout.py +++ b/src/allmydata/immutable/layout.py @@ -3,7 +3,7 @@ from zope.interface import implements from twisted.internet import defer from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ FileTooLargeError, HASH_SIZE -from allmydata.util import mathutil, idlib +from allmydata.util import log, mathutil, idlib, observer from allmydata.util.assertutil import precondition from allmydata import storage @@ -29,8 +29,8 @@ big-endian offset values, which indicate where each section starts. Each offset the beginning of the share data. 0x00: version number (=00 00 00 01) -0x04: segment size -0x08: data size +0x04: block size # See Footnote 1 below. +0x08: share data size # See Footnote 1 below. 0x0c: offset of data (=00 00 00 24) 0x10: offset of plaintext_hash_tree UNUSED 0x14: offset of crypttext_hash_tree @@ -43,7 +43,7 @@ the beginning of the share data. ? : start of block_hashes ? : start of share_hashes each share_hash is written as a two-byte (big-endian) hashnum - followed by the 32-byte SHA-256 hash. We only store the hashes + followed by the 32-byte SHA-256 hash. We store only the hashes necessary to validate the share hash root ? : start of uri_extension_length (four-byte big-endian value) ? : start of uri_extension @@ -54,8 +54,8 @@ v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size limitations described in #346. 0x00: version number (=00 00 00 02) -0x04: segment size -0x0c: data size +0x04: block size # See Footnote 1 below. +0x0c: share data size # See Footnote 1 below. 0x14: offset of data (=00 00 00 00 00 00 00 44) 0x1c: offset of plaintext_hash_tree UNUSED 0x24: offset of crypttext_hash_tree @@ -68,23 +68,26 @@ limitations described in #346. ? : start of uri_extension_length (eight-byte big-endian value) """ +# Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but they are still +# provided when writing so that older versions of Tahoe can read them. + def allocated_size(data_size, num_segments, num_share_hashes, - uri_extension_size): + uri_extension_size_max): wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes, - uri_extension_size, None) + uri_extension_size_max, None) uri_extension_starts_at = wbp._offsets['uri_extension'] - return uri_extension_starts_at + wbp.fieldsize + uri_extension_size + return uri_extension_starts_at + wbp.fieldsize + uri_extension_size_max class WriteBucketProxy: implements(IStorageBucketWriter) fieldsize = 4 fieldstruct = ">L" - def __init__(self, rref, data_size, segment_size, num_segments, - num_share_hashes, uri_extension_size, nodeid): + def __init__(self, rref, data_size, block_size, num_segments, + num_share_hashes, uri_extension_size_max, nodeid): self._rref = rref self._data_size = data_size - self._segment_size = segment_size + self._block_size = block_size self._num_segments = num_segments self._nodeid = nodeid @@ -92,14 +95,14 @@ class WriteBucketProxy: self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE # how many share hashes are included in each share? This will be # about ln2(num_shares). - self._share_hash_size = num_share_hashes * (2+HASH_SIZE) + self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE) # we commit to not sending a uri extension larger than this - self._uri_extension_size = uri_extension_size + self._uri_extension_size_max = uri_extension_size_max - self._create_offsets(segment_size, data_size) + self._create_offsets(block_size, data_size) - def _create_offsets(self, segment_size, data_size): - if segment_size >= 2**32 or data_size >= 2**32: + def _create_offsets(self, block_size, data_size): + if block_size >= 2**32 or data_size >= 2**32: raise FileTooLargeError("This file is too large to be uploaded (data_size).") offsets = self._offsets = {} @@ -113,7 +116,7 @@ class WriteBucketProxy: offsets['block_hashes'] = x x += self._segment_hash_size offsets['share_hashes'] = x - x += self._share_hash_size + x += self._share_hashtree_size offsets['uri_extension'] = x if x >= 2**32: @@ -121,7 +124,7 @@ class WriteBucketProxy: offset_data = struct.pack(">LLLLLLLLL", 1, # version number - segment_size, + block_size, data_size, offsets['data'], offsets['plaintext_hash_tree'], # UNUSED @@ -140,21 +143,21 @@ class WriteBucketProxy: nodeid_s = "[None]" return "" % nodeid_s - def start(self): + def put_header(self): return self._write(0, self._offset_data) def put_block(self, segmentnum, data): - offset = self._offsets['data'] + segmentnum * self._segment_size + offset = self._offsets['data'] + segmentnum * self._block_size assert offset + len(data) <= self._offsets['uri_extension'] assert isinstance(data, str) if segmentnum < self._num_segments-1: - precondition(len(data) == self._segment_size, - len(data), self._segment_size) + precondition(len(data) == self._block_size, + len(data), self._block_size) else: precondition(len(data) == (self._data_size - - (self._segment_size * + (self._block_size * (self._num_segments - 1))), - len(data), self._segment_size) + len(data), self._block_size) return self._write(offset, data) def put_crypttext_hashes(self, hashes): @@ -186,8 +189,8 @@ class WriteBucketProxy: assert isinstance(sharehashes, list) data = "".join([struct.pack(">H", hashnum) + hashvalue for hashnum,hashvalue in sharehashes]) - precondition(len(data) == self._share_hash_size, - len(data), self._share_hash_size) + precondition(len(data) == self._share_hashtree_size, + len(data), self._share_hashtree_size) precondition(offset + len(data) <= self._offsets['uri_extension'], offset, len(data), offset+len(data), self._offsets['uri_extension']) @@ -196,8 +199,8 @@ class WriteBucketProxy: def put_uri_extension(self, data): offset = self._offsets['uri_extension'] assert isinstance(data, str) - precondition(len(data) <= self._uri_extension_size, - len(data), self._uri_extension_size) + precondition(len(data) <= self._uri_extension_size_max, + len(data), self._uri_extension_size_max) length = struct.pack(self.fieldstruct, len(data)) return self._write(offset, length+data) @@ -215,8 +218,8 @@ class WriteBucketProxy_v2(WriteBucketProxy): fieldsize = 8 fieldstruct = ">Q" - def _create_offsets(self, segment_size, data_size): - if segment_size >= 2**64 or data_size >= 2**64: + def _create_offsets(self, block_size, data_size): + if block_size >= 2**64 or data_size >= 2**64: raise FileTooLargeError("This file is too large to be uploaded (data_size).") offsets = self._offsets = {} @@ -230,7 +233,7 @@ class WriteBucketProxy_v2(WriteBucketProxy): offsets['block_hashes'] = x x += self._segment_hash_size offsets['share_hashes'] = x - x += self._share_hash_size + x += self._share_hashtree_size offsets['uri_extension'] = x if x >= 2**64: @@ -238,7 +241,7 @@ class WriteBucketProxy_v2(WriteBucketProxy): offset_data = struct.pack(">LQQQQQQQQ", 2, # version number - segment_size, + block_size, data_size, offsets['data'], offsets['plaintext_hash_tree'], # UNUSED @@ -252,13 +255,17 @@ class WriteBucketProxy_v2(WriteBucketProxy): class ReadBucketProxy: implements(IStorageBucketReader) + + MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes + def __init__(self, rref, peerid, storage_index): self._rref = rref self._peerid = peerid peer_id_s = idlib.shortnodeid_b2a(peerid) storage_index_s = storage.si_b2a(storage_index) - self._reprstr = "" % (peer_id_s, storage_index_s) - self._started = False + self._reprstr = "" % (id(self), peer_id_s, storage_index_s) + self._started = False # sent request to server + self._ready = observer.OneShotObserverList() # got response from server def get_peerid(self): return self._peerid @@ -266,23 +273,28 @@ class ReadBucketProxy: def __repr__(self): return self._reprstr - def startIfNecessary(self): - if self._started: - return defer.succeed(self) - d = self.start() - d.addCallback(lambda res: self) - return d - - def start(self): - # TODO: for small shares, read the whole bucket in start() - d = self._read(0, 0x44) + def _start_if_needed(self): + """ Returns a deferred that will be fired when I'm ready to return data, or errbacks if + the starting (header reading and parsing) process fails.""" + if not self._started: + self._start() + return self._ready.when_fired() + + def _start(self): + self._started = True + # TODO: for small shares, read the whole bucket in _start() + d = self._fetch_header() d.addCallback(self._parse_offsets) - def _started(res): - self._started = True - return res - d.addCallback(_started) + d.addCallback(self._fetch_sharehashtree_and_ueb) + d.addCallback(self._parse_sharehashtree_and_ueb) + def _fail_waiters(f): + self._ready.fire(f) + d.addErrback(_fail_waiters) return d + def _fetch_header(self): + return self._read(0, 0x44) + def _parse_offsets(self, data): precondition(len(data) >= 0x4) self._offsets = {} @@ -295,15 +307,11 @@ class ReadBucketProxy: x = 0x0c fieldsize = 0x4 fieldstruct = ">L" - (self._segment_size, - self._data_size) = struct.unpack(">LL", data[0x4:0xc]) else: precondition(len(data) >= 0x44) x = 0x14 fieldsize = 0x8 fieldstruct = ">Q" - (self._segment_size, - self._data_size) = struct.unpack(">QQ", data[0x4:0x14]) self._version = version self._fieldsize = fieldsize @@ -321,67 +329,86 @@ class ReadBucketProxy: self._offsets[field] = offset return self._offsets - def get_block(self, blocknum): - num_segments = mathutil.div_ceil(self._data_size, self._segment_size) - if blocknum < num_segments-1: - size = self._segment_size - else: - size = self._data_size % self._segment_size - if size == 0: - size = self._segment_size - offset = self._offsets['data'] + blocknum * self._segment_size - return self._read(offset, size) + def _fetch_sharehashtree_and_ueb(self, offsets): + sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes'] + return self._read(offsets['share_hashes'], self.MAX_UEB_SIZE+sharehashtree_size) + + def _parse_sharehashtree_and_ueb(self, data): + sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes'] + if len(data) < sharehashtree_size: + raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data))) + if sharehashtree_size % (2+HASH_SIZE) != 0: + raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size)) + self._share_hashes = [] + for i in range(0, sharehashtree_size, 2+HASH_SIZE): + hashnum = struct.unpack(">H", data[i:i+2])[0] + hashvalue = data[i+2:i+2+HASH_SIZE] + self._share_hashes.append( (hashnum, hashvalue) ) + + i = self._offsets['uri_extension']-self._offsets['share_hashes'] + if len(data) < i+self._fieldsize: + raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),)) + length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0] + self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length] + + self._ready.fire(self) + + def _get_block_data(self, unused, blocknum, blocksize, thisblocksize): + offset = self._offsets['data'] + blocknum * blocksize + return self._read(offset, thisblocksize) + + def get_block_data(self, blocknum, blocksize, thisblocksize): + d = self._start_if_needed() + d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize) + return d def _str2l(self, s): """ split string (pulled from storage) into a list of blockids """ return [ s[i:i+HASH_SIZE] for i in range(0, len(s), HASH_SIZE) ] - def get_crypttext_hashes(self): + def _get_crypttext_hashes(self, unused=None): offset = self._offsets['crypttext_hash_tree'] size = self._offsets['block_hashes'] - offset d = self._read(offset, size) d.addCallback(self._str2l) return d - def get_block_hashes(self): + def get_crypttext_hashes(self): + d = self._start_if_needed() + d.addCallback(self._get_crypttext_hashes) + return d + + def _get_block_hashes(self, unused=None, at_least_these=()): + # TODO: fetch only at_least_these instead of all of them. offset = self._offsets['block_hashes'] size = self._offsets['share_hashes'] - offset d = self._read(offset, size) d.addCallback(self._str2l) return d + def get_block_hashes(self, at_least_these=()): + if at_least_these: + d = self._start_if_needed() + d.addCallback(self._get_block_hashes, at_least_these) + return d + else: + return defer.succeed([]) + + def _get_share_hashes(self, unused=None): + return self._share_hashes + def get_share_hashes(self): - offset = self._offsets['share_hashes'] - size = self._offsets['uri_extension'] - offset - assert size % (2+HASH_SIZE) == 0 - d = self._read(offset, size) - def _unpack_share_hashes(data): - assert len(data) == size - hashes = [] - for i in range(0, size, 2+HASH_SIZE): - hashnum = struct.unpack(">H", data[i:i+2])[0] - hashvalue = data[i+2:i+2+HASH_SIZE] - hashes.append( (hashnum, hashvalue) ) - return hashes - d.addCallback(_unpack_share_hashes) + d = self._start_if_needed() + d.addCallback(self._get_share_hashes) return d + def _get_uri_extension(self, unused=None): + return self._ueb_data + def get_uri_extension(self): - offset = self._offsets['uri_extension'] - d = self._read(offset, self._fieldsize) - def _got_length(data): - if len(data) != self._fieldsize: - raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),)) - length = struct.unpack(self._fieldstruct, data)[0] - if length >= 2**31: - # URI extension blocks are around 419 bytes long, so this must be corrupted. - # Anyway, the foolscap interface schema for "read" will not allow >= 2**31 bytes - # length. - raise RidiculouslyLargeURIExtensionBlock(length) - - return self._read(offset+self._fieldsize, length) - d.addCallback(_got_length) + d = self._start_if_needed() + d.addCallback(self._get_uri_extension) return d def _read(self, offset, length): diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 19100462..a176e01c 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -306,7 +306,7 @@ class IStorageBucketWriter(Interface): class IStorageBucketReader(Interface): - def get_block(blocknum=int): + def get_block_data(blocknum=int, blocksize=int, size=int): """Most blocks will be the same size. The last block might be shorter than the others. @@ -318,12 +318,12 @@ class IStorageBucketReader(Interface): @return: ListOf(Hash) """ - def get_block_hashes(): + def get_block_hashes(at_least_these=SetOf(int)): """ @return: ListOf(Hash) """ - def get_share_hashes(): + def get_share_hashes(at_least_these=SetOf(int)): """ @return: ListOf(TupleOf(int, Hash)) """ diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 048203d7..10097e3a 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -88,8 +88,7 @@ class CHKCheckerAndUEBFetcher: return b,peerid = self._readers.pop() rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index)) - d = rbp.startIfNecessary() - d.addCallback(lambda res: rbp.get_uri_extension()) + d = rbp.get_uri_extension() d.addCallback(self._got_uri_extension) d.addErrback(self._ueb_error) return d diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index 73a0d028..b05cfd99 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -105,7 +105,8 @@ def dump_immutable_share(options): print >>out, "%20s: %s" % ("verify-cap", verify_cap) sizes = {} - sizes['data'] = bp._data_size + sizes['data'] = (offsets['plaintext_hash_tree'] - + offsets['data']) sizes['validation'] = (offsets['uri_extension'] - offsets['plaintext_hash_tree']) sizes['uri-extension'] = len(UEB_data) @@ -586,6 +587,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): class ImmediateReadBucketProxy(ReadBucketProxy): def __init__(self, sf): self.sf = sf + ReadBucketProxy.__init__(self, "", "", "") def __repr__(self): return "" def _read(self, offset, size): @@ -594,7 +596,6 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): # use a ReadBucketProxy to parse the bucket and find the uri extension sf = storage.ShareFile(abs_sharefile) bp = ImmediateReadBucketProxy(sf) - call(bp.start) expiration_time = min( [lease.expiration_time for lease in sf.iter_leases()] ) diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py index 57822040..d3e4ca93 100644 --- a/src/allmydata/test/test_encode.py +++ b/src/allmydata/test/test_encode.py @@ -36,14 +36,15 @@ class FakeBucketReaderWriterProxy: def get_peerid(self): return "peerid" - def startIfNecessary(self): - return defer.succeed(self) - def start(self): + def _start(self): if self.mode == "lost-early": f = Failure(LostPeerError("I went away early")) return eventual.fireEventually(f) return defer.succeed(self) + def put_header(self): + return self._start() + def put_block(self, segmentnum, data): if self.mode == "lost-early": f = Failure(LostPeerError("I went away early")) @@ -99,41 +100,50 @@ class FakeBucketReaderWriterProxy: def abort(self): return defer.succeed(None) - def get_block(self, blocknum): - def _try(): + def get_block_data(self, blocknum, blocksize, size): + d = self._start() + def _try(unused=None): assert isinstance(blocknum, (int, long)) if self.mode == "bad block": return flip_bit(self.blocks[blocknum]) return self.blocks[blocknum] - return defer.maybeDeferred(_try) + d.addCallback(_try) + return d def get_plaintext_hashes(self): - def _try(): + d = self._start() + def _try(unused=None): hashes = self.plaintext_hashes[:] return hashes - return defer.maybeDeferred(_try) + d.addCallback(_try) + return d def get_crypttext_hashes(self): - def _try(): + d = self._start() + def _try(unused=None): hashes = self.crypttext_hashes[:] if self.mode == "bad crypttext hashroot": hashes[0] = flip_bit(hashes[0]) if self.mode == "bad crypttext hash": hashes[1] = flip_bit(hashes[1]) return hashes - return defer.maybeDeferred(_try) + d.addCallback(_try) + return d - def get_block_hashes(self): - def _try(): + def get_block_hashes(self, at_least_these=()): + d = self._start() + def _try(unused=None): if self.mode == "bad blockhash": hashes = self.block_hashes[:] hashes[1] = flip_bit(hashes[1]) return hashes return self.block_hashes - return defer.maybeDeferred(_try) + d.addCallback(_try) + return d - def get_share_hashes(self): - def _try(): + def get_share_hashes(self, at_least_these=()): + d = self._start() + def _try(unused=None): if self.mode == "bad sharehash": hashes = self.share_hashes[:] hashes[1] = (hashes[1][0], flip_bit(hashes[1][1])) @@ -144,14 +154,17 @@ class FakeBucketReaderWriterProxy: # download.py is supposed to guard against this case. return [] return self.share_hashes - return defer.maybeDeferred(_try) + d.addCallback(_try) + return d def get_uri_extension(self): - def _try(): + d = self._start() + def _try(unused=None): if self.mode == "bad uri_extension": return flip_bit(self.uri_extension) return self.uri_extension - return defer.maybeDeferred(_try) + d.addCallback(_try) + return d def make_data(length): @@ -719,9 +732,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin): # the first 7 servers have bad block hashes, so the sharehash tree # will not validate, and the download will fail modemap = dict([(i, "bad sharehash") - for i in range(7)] - + [(i, "good") - for i in range(7, 10)]) + for i in range(10)]) d = self.send_and_recover((4,8,10), bucket_modes=modemap) def _done(res): self.failUnless(isinstance(res, Failure)) @@ -739,12 +750,10 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin): return self.send_and_recover((4,8,10), bucket_modes=modemap) def test_missing_sharehashes_failure(self): - # the first 7 servers are missing their sharehashes, so the - # sharehash tree will not validate, and the download will fail + # all servers are missing their sharehashes, so the sharehash tree will not validate, + # and the download will fail modemap = dict([(i, "missing sharehash") - for i in range(7)] - + [(i, "good") - for i in range(7, 10)]) + for i in range(10)]) d = self.send_and_recover((4,8,10), bucket_modes=modemap) def _done(res): self.failUnless(isinstance(res, Failure), res) diff --git a/src/allmydata/test/test_hashtree.py b/src/allmydata/test/test_hashtree.py index f5bbb49b..5388c6e9 100644 --- a/src/allmydata/test/test_hashtree.py +++ b/src/allmydata/test/test_hashtree.py @@ -62,6 +62,24 @@ class Incomplete(unittest.TestCase): self.failUnlessRaises(IndexError, ht.get_leaf, 8) self.failUnlessEqual(ht.get_leaf_index(0), 7) + def test_needed_hashes(self): + ht = hashtree.IncompleteHashTree(8) + self.failUnlessEqual(ht.needed_hashes(0), set([8, 4, 2])) + self.failUnlessEqual(ht.needed_hashes(0, True), set([7, 8, 4, 2])) + self.failUnlessEqual(ht.needed_hashes(1), set([7, 4, 2])) + self.failUnlessEqual(ht.needed_hashes(7), set([13, 5, 1])) + self.failUnlessEqual(ht.needed_hashes(7, False), set([13, 5, 1])) + self.failUnlessEqual(ht.needed_hashes(7, True), set([14, 13, 5, 1])) + ht = hashtree.IncompleteHashTree(1) + self.failUnlessEqual(ht.needed_hashes(0), set([])) + ht = hashtree.IncompleteHashTree(6) + self.failUnlessEqual(ht.needed_hashes(0), set([8, 4, 2])) + self.failUnlessEqual(ht.needed_hashes(0, True), set([7, 8, 4, 2])) + self.failUnlessEqual(ht.needed_hashes(1), set([7, 4, 2])) + self.failUnlessEqual(ht.needed_hashes(5), set([11, 6, 1])) + self.failUnlessEqual(ht.needed_hashes(5, False), set([11, 6, 1])) + self.failUnlessEqual(ht.needed_hashes(5, True), set([12, 11, 6, 1])) + def test_check(self): # first create a complete hash tree ht = make_tree(6) @@ -160,4 +178,3 @@ class Incomplete(unittest.TestCase): iht.set_hashes(chain, leaves={4: tagged_hash("tag", "4")}) except hashtree.BadHashError, e: self.fail("bad hash: %s" % e) - diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py index 8f8984a4..e9bdf0dc 100644 --- a/src/allmydata/test/test_immutable.py +++ b/src/allmydata/test/test_immutable.py @@ -380,9 +380,10 @@ class Test(ShareManglingMixin, unittest.TestCase): before_download_reads = self._count_reads() def _after_download(unused=None): after_download_reads = self._count_reads() - # To pass this test, you have to download the file using only 10 reads to get the - # UEB (in parallel from all shares), plus one read for each of the 3 shares. - self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads)) + # To pass this test, you have to download the file using only 10 reads total: 3 to + # get the headers from each share, 3 to get the share hash trees and uebs from each + # share, 1 to get the crypttext hashes, and 3 to get the block data from each share. + self.failIf(after_download_reads-before_download_reads > 10, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) return d @@ -403,7 +404,6 @@ class Test(ShareManglingMixin, unittest.TestCase): d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) return d - test_download_from_only_3_remaining_shares.todo = "I think this test is failing due to the downloader code not knowing how to handle URI corruption and keeping going. I'm going to commit new downloader code soon, and then see if this test starts passing." def test_download_abort_if_too_many_missing_shares(self): """ Test that download gives up quickly when it realizes there aren't enough shares out diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 3548c5cb..1b0d3ae3 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -125,10 +125,10 @@ class BucketProxy(unittest.TestCase): bw, rb, sharefname = self.make_bucket("test_create", 500) bp = WriteBucketProxy(rb, data_size=300, - segment_size=10, + block_size=10, num_segments=5, num_share_hashes=3, - uri_extension_size=500, nodeid=None) + uri_extension_size_max=500, nodeid=None) self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp)) def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class): @@ -156,13 +156,13 @@ class BucketProxy(unittest.TestCase): bw, rb, sharefname = self.make_bucket(name, sharesize) bp = wbp_class(rb, data_size=95, - segment_size=25, + block_size=25, num_segments=4, num_share_hashes=3, - uri_extension_size=len(uri_extension), + uri_extension_size_max=len(uri_extension), nodeid=None) - d = bp.start() + d = bp.put_header() d.addCallback(lambda res: bp.put_block(0, "a"*25)) d.addCallback(lambda res: bp.put_block(1, "b"*25)) d.addCallback(lambda res: bp.put_block(2, "c"*25)) @@ -182,21 +182,19 @@ class BucketProxy(unittest.TestCase): self.failUnless("to peer" in repr(rbp)) self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp)) - d1 = rbp.startIfNecessary() - d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent - d1.addCallback(lambda res: rbp.get_block(0)) + d1 = rbp.get_block_data(0, 25, 25) d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25)) - d1.addCallback(lambda res: rbp.get_block(1)) + d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25)) d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25)) - d1.addCallback(lambda res: rbp.get_block(2)) + d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25)) d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25)) - d1.addCallback(lambda res: rbp.get_block(3)) + d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20)) d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20)) d1.addCallback(lambda res: rbp.get_crypttext_hashes()) d1.addCallback(lambda res: self.failUnlessEqual(res, crypttext_hashes)) - d1.addCallback(lambda res: rbp.get_block_hashes()) + d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4)))) d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes)) d1.addCallback(lambda res: rbp.get_share_hashes()) d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))