From: Brian Warner Date: Fri, 10 Oct 2008 01:13:27 +0000 (-0700) Subject: storage: introduce v2 immutable shares, with 8-byte offsets fields, to remove two... X-Git-Url: https://git.rkrishnan.org/pf/content/simplejson/running.html?a=commitdiff_plain;h=7031a69beeed4abbf20b2903f10894d7545411b2;p=tahoe-lafs%2Ftahoe-lafs.git storage: introduce v2 immutable shares, with 8-byte offsets fields, to remove two of the three size limitations in #346. This code handles v2 shares but does not generate them. We'll make a release with this v2-tolerance, wait a while, then make a second release that actually generates v2 shares, to avoid compatibility problems. --- diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index 9f7be655..6e38e2d0 100644 --- a/src/allmydata/immutable/layout.py +++ b/src/allmydata/immutable/layout.py @@ -34,15 +34,37 @@ section starts. Each offset is measured from the beginning of the file. ? : start of uri_extension """ +""" +v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size +limitations described in #346. + +0x00: version number (=00 00 00 02) +0x04: segment size +0x0c: data size +0x14: offset of data (=00 00 00 00 00 00 00 44) +0x1c: offset of plaintext_hash_tree +0x24: offset of crypttext_hash_tree +0x2c: offset of block_hashes +0x34: offset of share_hashes +0x3c: offset of uri_extension_length + uri_extension +0x44: start of data + : rest of share is the same as v1, above +... ... +? : start of uri_extension_length (eight-byte big-endian value) +""" + def allocated_size(data_size, num_segments, num_share_hashes, uri_extension_size): wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes, uri_extension_size, None) uri_extension_starts_at = wbp._offsets['uri_extension'] - return uri_extension_starts_at + 4 + uri_extension_size + return uri_extension_starts_at + wbp.fieldsize + uri_extension_size class WriteBucketProxy: implements(IStorageBucketWriter) + fieldsize = 4 + fieldstruct = ">L" + def __init__(self, rref, data_size, segment_size, num_segments, num_share_hashes, uri_extension_size, nodeid): self._rref = rref @@ -51,9 +73,6 @@ class WriteBucketProxy: self._num_segments = num_segments self._nodeid = nodeid - if segment_size >= 2**32 or data_size >= 2**32: - raise FileTooLargeError("This file is too large to be uploaded (data_size).") - effective_segments = mathutil.next_power_of_k(num_segments,2) self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE # how many share hashes are included in each share? This will be @@ -62,6 +81,12 @@ class WriteBucketProxy: # we commit to not sending a uri extension larger than this self._uri_extension_size = uri_extension_size + self._create_offsets(segment_size, data_size) + + def _create_offsets(self, segment_size, data_size): + if segment_size >= 2**32 or data_size >= 2**32: + raise FileTooLargeError("This file is too large to be uploaded (data_size).") + offsets = self._offsets = {} x = 0x24 offsets['data'] = x @@ -169,7 +194,7 @@ class WriteBucketProxy: assert isinstance(data, str) precondition(len(data) <= self._uri_extension_size, len(data), self._uri_extension_size) - length = struct.pack(">L", len(data)) + length = struct.pack(self.fieldstruct, len(data)) return self._write(offset, length+data) def _write(self, offset, data): @@ -182,6 +207,45 @@ class WriteBucketProxy: def abort(self): return self._rref.callRemoteOnly("abort") +class WriteBucketProxy_v2(WriteBucketProxy): + fieldsize = 8 + fieldstruct = ">Q" + + def _create_offsets(self, segment_size, data_size): + if segment_size >= 2**64 or data_size >= 2**64: + raise FileTooLargeError("This file is too large to be uploaded (data_size).") + + offsets = self._offsets = {} + x = 0x44 + offsets['data'] = x + x += data_size + offsets['plaintext_hash_tree'] = x + x += self._segment_hash_size + offsets['crypttext_hash_tree'] = x + x += self._segment_hash_size + offsets['block_hashes'] = x + x += self._segment_hash_size + offsets['share_hashes'] = x + x += self._share_hash_size + offsets['uri_extension'] = x + + if x >= 2**64: + raise FileTooLargeError("This file is too large to be uploaded (offsets).") + + offset_data = struct.pack(">LQQQQQQQQ", + 2, # version number + segment_size, + data_size, + offsets['data'], + offsets['plaintext_hash_tree'], + offsets['crypttext_hash_tree'], + offsets['block_hashes'], + offsets['share_hashes'], + offsets['uri_extension'], + ) + assert len(offset_data) == 0x44, len(offset_data) + self._offset_data = offset_data + class ReadBucketProxy: implements(IStorageBucketReader) def __init__(self, rref, peerid=None, storage_index_s=None): @@ -207,7 +271,7 @@ class ReadBucketProxy: def start(self): # TODO: for small shares, read the whole bucket in start() - d = self._read(0, 0x24) + d = self._read(0, 0x44) d.addCallback(self._parse_offsets) def _started(res): self._started = True @@ -216,12 +280,30 @@ class ReadBucketProxy: return d def _parse_offsets(self, data): - precondition(len(data) == 0x24) + precondition(len(data) >= 0x4) self._offsets = {} - (version, self._segment_size, self._data_size) = \ - struct.unpack(">LLL", data[0:0xc]) - _assert(version == 1) - x = 0x0c + (version,) = struct.unpack(">L", data[0:4]) + _assert(version in (1,2)) + + if version == 1: + precondition(len(data) >= 0x24) + x = 0x0c + fieldsize = 0x4 + fieldstruct = ">L" + (self._segment_size, + self._data_size) = struct.unpack(">LL", data[0x4:0xc]) + else: + precondition(len(data) >= 0x44) + x = 0x14 + fieldsize = 0x8 + fieldstruct = ">Q" + (self._segment_size, + self._data_size) = struct.unpack(">QQ", data[0x4:0x14]) + + self._version = version + self._fieldsize = fieldsize + self._fieldstruct = fieldstruct + for field in ( 'data', 'plaintext_hash_tree', 'crypttext_hash_tree', @@ -229,8 +311,8 @@ class ReadBucketProxy: 'share_hashes', 'uri_extension', ): - offset = struct.unpack(">L", data[x:x+4])[0] - x += 4 + offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0] + x += fieldsize self._offsets[field] = offset return self._offsets @@ -289,10 +371,10 @@ class ReadBucketProxy: def get_uri_extension(self): offset = self._offsets['uri_extension'] - d = self._read(offset, 4) + d = self._read(offset, self._fieldsize) def _got_length(data): - length = struct.unpack(">L", data)[0] - return self._read(offset+4, length) + length = struct.unpack(self._fieldstruct, data)[0] + return self._read(offset+self._fieldsize, length) d.addCallback(_got_length) return d diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 70db7ab7..edb9ff94 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -9,7 +9,8 @@ from allmydata.util import fileutil, hashutil from allmydata.storage import BucketWriter, BucketReader, \ StorageServer, MutableShareFile, \ storage_index_to_dir, DataTooLargeError, LeaseInfo -from allmydata.immutable.layout import WriteBucketProxy, ReadBucketProxy +from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ + ReadBucketProxy from allmydata.interfaces import BadWriteEnablerError from allmydata.test.common import LoggingServiceParent @@ -131,7 +132,7 @@ class BucketProxy(unittest.TestCase): uri_extension_size=500, nodeid=None) self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp)) - def test_readwrite(self): + def _do_test_readwrite(self, header_size, wbp_class, rbp_class): # Let's pretend each share has 100 bytes of data, and that there are # 4 segments (25 bytes each), and 8 shares total. So the three # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree, @@ -141,6 +142,9 @@ class BucketProxy(unittest.TestCase): # long. That should make the whole share: # # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long + # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long + + sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i) for i in range(7)] @@ -152,14 +156,14 @@ class BucketProxy(unittest.TestCase): for i in (1,9,13)] uri_extension = "s" + "E"*498 + "e" - bw, rb, sharefname = self.make_bucket("test_readwrite", 1414) - bp = WriteBucketProxy(rb, - data_size=95, - segment_size=25, - num_segments=4, - num_share_hashes=3, - uri_extension_size=len(uri_extension), - nodeid=None) + bw, rb, sharefname = self.make_bucket("test_readwrite", sharesize) + bp = wbp_class(rb, + data_size=95, + segment_size=25, + num_segments=4, + num_share_hashes=3, + uri_extension_size=len(uri_extension), + nodeid=None) d = bp.start() d.addCallback(lambda res: bp.put_block(0, "a"*25)) @@ -178,7 +182,7 @@ class BucketProxy(unittest.TestCase): br = BucketReader(self, sharefname) rb = RemoteBucket() rb.target = br - rbp = ReadBucketProxy(rb, peerid="abc") + rbp = rbp_class(rb, peerid="abc") self.failUnless("to peer" in repr(rbp)) self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp)) @@ -213,7 +217,11 @@ class BucketProxy(unittest.TestCase): return d + def test_readwrite_v1(self): + return self._do_test_readwrite(0x24, WriteBucketProxy, ReadBucketProxy) + def test_readwrite_v2(self): + return self._do_test_readwrite(0x44, WriteBucketProxy_v2, ReadBucketProxy) class Server(unittest.TestCase):