From: Brian Warner Date: Fri, 13 Jul 2007 23:38:25 +0000 (-0700) Subject: storage: we must truncate short segments. Now most tests pass (except uri_extension) X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/%22doc.html/architecture.txt?a=commitdiff_plain;h=7589a8ee82eb65310af226bb0cca9d4b2c402827;p=tahoe-lafs%2Ftahoe-lafs.git storage: we must truncate short segments. Now most tests pass (except uri_extension) --- diff --git a/src/allmydata/download.py b/src/allmydata/download.py index e048f1d5..59649c9c 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -119,7 +119,7 @@ class ValidatedBucket: if not self._share_hash: d1 = self.bucket.get_share_hashes() else: - d1 = defer.succeed(None) + d1 = defer.succeed([]) # we might need to grab some elements of our block hash tree, to # validate the requested block up to the share hash @@ -149,9 +149,12 @@ class ValidatedBucket: sht.set_hashes(sh) self._share_hash = sht.get_leaf(self.sharenum) - #log.msg("checking block_hash(shareid=%d, blocknum=%d) len=%d" % - # (self.sharenum, blocknum, len(blockdata))) blockhash = hashutil.block_hash(blockdata) + #log.msg("checking block_hash(shareid=%d, blocknum=%d) len=%d " + # "%r .. %r: %s" % + # (self.sharenum, blocknum, len(blockdata), + # blockdata[:50], blockdata[-50:], idlib.b2a(blockhash))) + # we always validate the blockhash bh = dict(enumerate(blockhashes)) # replace blockhash root with validated value @@ -163,20 +166,33 @@ class ValidatedBucket: # likely a programming error log.msg("hash failure in block=%d, shnum=%d on %s" % (blocknum, self.sharenum, self.bucket)) - #log.msg(" block length: %d" % len(blockdata)) - #log.msg(" block hash: %s" % idlib.b2a_or_none(blockhash)) # not safe - #log.msg(" block data: %r" % (blockdata,)) - #log.msg(" root hash: %s" % idlib.b2a(self._roothash)) - #log.msg(" share hash tree:\n" + self.share_hash_tree.dump()) - #log.msg(" block hash tree:\n" + self.block_hash_tree.dump()) - #lines = [] - #for i,h in sorted(sharehashes): - # lines.append("%3d: %s" % (i, idlib.b2a_or_none(h))) - #log.msg(" sharehashes:\n" + "\n".join(lines) + "\n") - #lines = [] - #for i,h in enumerate(blockhashes): - # lines.append("%3d: %s" % (i, idlib.b2a_or_none(h))) - #log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") + if self._share_hash: + log.msg(""" failure occurred when checking the block_hash_tree. + This suggests that either the block data was bad, or that the + block hashes we received along with it were bad.""") + else: + log.msg(""" the failure probably occurred when checking the + share_hash_tree, which suggests that the share hashes we + received from the remote peer were bad.""") + log.msg(" have self._share_hash: %s" % bool(self._share_hash)) + log.msg(" block length: %d" % len(blockdata)) + log.msg(" block hash: %s" % idlib.b2a_or_none(blockhash)) # not safe + if len(blockdata) < 100: + log.msg(" block data: %r" % (blockdata,)) + else: + log.msg(" block data start/end: %r .. %r" % + (blockdata[:50], blockdata[-50:])) + log.msg(" root hash: %s" % idlib.b2a(self._roothash)) + log.msg(" share hash tree:\n" + self.share_hash_tree.dump()) + log.msg(" block hash tree:\n" + self.block_hash_tree.dump()) + lines = [] + for i,h in sorted(sharehashes): + lines.append("%3d: %s" % (i, idlib.b2a_or_none(h))) + log.msg(" sharehashes:\n" + "\n".join(lines) + "\n") + lines = [] + for i,h in enumerate(blockhashes): + lines.append("%3d: %s" % (i, idlib.b2a_or_none(h))) + log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") raise # If we made it here, the block is good. If the hash trees didn't diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index f3af448b..3f643964 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -302,6 +302,11 @@ class Encoder(object): d = self.send_subshare(shareid, segnum, subshare) dl.append(d) subshare_hash = hashutil.block_hash(subshare) + #from allmydata.util import idlib + #log.msg("creating block (shareid=%d, blocknum=%d) " + # "len=%d %r .. %r: %s" % + # (shareid, segnum, len(subshare), + # subshare[:50], subshare[-50:], idlib.b2a(subshare_hash))) self.subshare_hashes[shareid].append(subshare_hash) dl = self._gather_responses(dl) def _logit(res): diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py index 74b4c894..f3e92d15 100644 --- a/src/allmydata/storageserver.py +++ b/src/allmydata/storageserver.py @@ -6,9 +6,8 @@ from twisted.internet import defer from zope.interface import implements from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ - RIBucketReader, IStorageBucketWriter, IStorageBucketReader -from allmydata import interfaces -from allmydata.util import fileutil, idlib + RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE +from allmydata.util import fileutil, idlib, mathutil from allmydata.util.assertutil import precondition # store/ @@ -151,22 +150,23 @@ a series of four-byte big-endian offset values, which indicate where each section starts. Each offset is measured from the beginning of the file. 0x00: segment size -0x04: offset of data (=00 00 00 1c) -0x08: offset of plaintext_hash_tree -0x0c: offset of crypttext_hash_tree -0x10: offset of block_hashes -0x14: offset of share_hashes -0x18: offset of uri_extension_length + uri_extension -0x1c: start of data - start of plaintext_hash_tree - start of crypttext_hash_tree - start of block_hashes - start of share_hashes +0x04: data size +0x08: offset of data (=00 00 00 1c) +0x0c: offset of plaintext_hash_tree +0x10: offset of crypttext_hash_tree +0x14: offset of block_hashes +0x18: offset of share_hashes +0x1c: offset of uri_extension_length + uri_extension +0x20: start of data +? : start of plaintext_hash_tree +? : start of crypttext_hash_tree +? : start of block_hashes +? : start of share_hashes each share_hash is written as a two-byte (big-endian) hashnum followed by the 32-byte SHA-256 hash. We only store the hashes necessary to validate the share hash root - start of uri_extension_length (four-byte big-endian value) - start of uri_extension +? : start of uri_extension_length (four-byte big-endian value) +? : start of uri_extension """ def allocated_size(data_size, num_segments, num_share_hashes, @@ -181,10 +181,10 @@ class WriteBucketProxy: def __init__(self, rref, data_size, segment_size, num_segments, num_share_hashes, uri_extension_size): self._rref = rref + self._data_size = data_size self._segment_size = segment_size self._num_segments = num_segments - HASH_SIZE = interfaces.HASH_SIZE self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE # how many share hashes are included in each share? This will be # about ln2(num_shares). @@ -193,7 +193,7 @@ class WriteBucketProxy: self._uri_extension_size = uri_extension_size offsets = self._offsets = {} - x = 0x1c + x = 0x20 offsets['data'] = x x += data_size offsets['plaintext_hash_tree'] = x @@ -206,16 +206,17 @@ class WriteBucketProxy: x += self._share_hash_size offsets['uri_extension'] = x - offset_data = struct.pack(">LLLLLLL", + offset_data = struct.pack(">LLLLLLLL", segment_size, + data_size, offsets['data'], offsets['plaintext_hash_tree'], offsets['crypttext_hash_tree'], offsets['block_hashes'], offsets['share_hashes'], - offsets['uri_extension'] + offsets['uri_extension'], ) - assert len(offset_data) == 7*4 + assert len(offset_data) == 8*4 self._offset_data = offset_data def start(self): @@ -229,7 +230,9 @@ class WriteBucketProxy: precondition(len(data) == self._segment_size, len(data), self._segment_size) else: - precondition(len(data) <= self._segment_size, + precondition(len(data) == (self._data_size - + (self._segment_size * + (self._num_segments - 1))), len(data), self._segment_size) return self._write(offset, data) @@ -298,17 +301,19 @@ class ReadBucketProxy: def start(self): # TODO: for small shares, read the whole bucket in start() - d = self._read(0, 7*4) + d = self._read(0, 8*4) self._offsets = {} def _got_offsets(data): self._segment_size = struct.unpack(">L", data[0:4])[0] - x = 4 + self._data_size = struct.unpack(">L", data[4:8])[0] + x = 0x08 for field in ( 'data', 'plaintext_hash_tree', 'crypttext_hash_tree', 'block_hashes', 'share_hashes', - 'uri_extension' ): + 'uri_extension', + ): offset = struct.unpack(">L", data[x:x+4])[0] x += 4 self._offsets[field] = offset @@ -316,13 +321,20 @@ class ReadBucketProxy: return d def get_block(self, blocknum): + num_segments = mathutil.div_ceil(self._data_size, self._segment_size) + if blocknum < num_segments-1: + size = self._segment_size + else: + size = self._data_size % self._segment_size + if size == 0: + size = self._segment_size offset = self._offsets['data'] + blocknum * self._segment_size - return self._read(offset, self._segment_size) + return self._read(offset, size) def _str2l(self, s): """ split string (pulled from storage) into a list of blockids """ - return [ s[i:i+interfaces.HASH_SIZE] - for i in range(0, len(s), interfaces.HASH_SIZE) ] + return [ s[i:i+HASH_SIZE] + for i in range(0, len(s), HASH_SIZE) ] def get_plaintext_hashes(self): offset = self._offsets['plaintext_hash_tree'] @@ -348,7 +360,6 @@ class ReadBucketProxy: def get_share_hashes(self): offset = self._offsets['share_hashes'] size = self._offsets['uri_extension'] - offset - HASH_SIZE = interfaces.HASH_SIZE assert size % (2+HASH_SIZE) == 0 d = self._read(offset, size) def _unpack_share_hashes(data): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 7d6afd42..01e282a8 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -102,7 +102,7 @@ class BucketProxy(unittest.TestCase): bw, rb, final = self.make_bucket("test_readwrite", 1406) bp = WriteBucketProxy(rb, - data_size=100, + data_size=95, segment_size=25, num_segments=4, num_share_hashes=3, @@ -112,7 +112,7 @@ class BucketProxy(unittest.TestCase): d.addCallback(lambda res: bp.put_block(0, "a"*25)) d.addCallback(lambda res: bp.put_block(1, "b"*25)) d.addCallback(lambda res: bp.put_block(2, "c"*25)) - d.addCallback(lambda res: bp.put_block(3, "d"*25)) + d.addCallback(lambda res: bp.put_block(3, "d"*20)) d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes)) d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes)) d.addCallback(lambda res: bp.put_block_hashes(block_hashes)) @@ -136,7 +136,7 @@ class BucketProxy(unittest.TestCase): d1.addCallback(lambda res: rbp.get_block(2)) d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25)) d1.addCallback(lambda res: rbp.get_block(3)) - d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*25)) + d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20)) d1.addCallback(lambda res: rbp.get_plaintext_hashes()) d1.addCallback(lambda res: diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 92a08763..eaddffa7 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -4,7 +4,7 @@ from twisted.python.failure import Failure from twisted.internet import defer from cStringIO import StringIO -from allmydata import upload, encode, storageserver +from allmydata import upload, encode from allmydata.uri import unpack_uri, unpack_lit from allmydata.util.assertutil import precondition from foolscap import eventual @@ -35,7 +35,7 @@ class FakeStorageServer: return d def allocate_buckets(self, crypttext_hash, sharenums, - share_size, blocksize, canary): + share_size, canary): #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size) if self.mode == "full": return (set(), {},)