? : start of uri_extension
"""
+"""
+v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
+limitations described in #346.
+
+0x00: version number (=00 00 00 02)
+0x04: segment size
+0x0c: data size
+0x14: offset of data (=00 00 00 00 00 00 00 44)
+0x1c: offset of plaintext_hash_tree
+0x24: offset of crypttext_hash_tree
+0x2c: offset of block_hashes
+0x34: offset of share_hashes
+0x3c: offset of uri_extension_length + uri_extension
+0x44: start of data
+ : rest of share is the same as v1, above
+... ...
+? : start of uri_extension_length (eight-byte big-endian value)
+"""
+
def allocated_size(data_size, num_segments, num_share_hashes,
uri_extension_size):
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
uri_extension_size, None)
uri_extension_starts_at = wbp._offsets['uri_extension']
- return uri_extension_starts_at + 4 + uri_extension_size
+ return uri_extension_starts_at + wbp.fieldsize + uri_extension_size
class WriteBucketProxy:
implements(IStorageBucketWriter)
+ fieldsize = 4
+ fieldstruct = ">L"
+
def __init__(self, rref, data_size, segment_size, num_segments,
num_share_hashes, uri_extension_size, nodeid):
self._rref = rref
self._num_segments = num_segments
self._nodeid = nodeid
- if segment_size >= 2**32 or data_size >= 2**32:
- raise FileTooLargeError("This file is too large to be uploaded (data_size).")
-
effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# we commit to not sending a uri extension larger than this
self._uri_extension_size = uri_extension_size
+ self._create_offsets(segment_size, data_size)
+
+ def _create_offsets(self, segment_size, data_size):
+ if segment_size >= 2**32 or data_size >= 2**32:
+ raise FileTooLargeError("This file is too large to be uploaded (data_size).")
+
offsets = self._offsets = {}
x = 0x24
offsets['data'] = x
assert isinstance(data, str)
precondition(len(data) <= self._uri_extension_size,
len(data), self._uri_extension_size)
- length = struct.pack(">L", len(data))
+ length = struct.pack(self.fieldstruct, len(data))
return self._write(offset, length+data)
def _write(self, offset, data):
def abort(self):
return self._rref.callRemoteOnly("abort")
+class WriteBucketProxy_v2(WriteBucketProxy):
+ fieldsize = 8
+ fieldstruct = ">Q"
+
+ def _create_offsets(self, segment_size, data_size):
+ if segment_size >= 2**64 or data_size >= 2**64:
+ raise FileTooLargeError("This file is too large to be uploaded (data_size).")
+
+ offsets = self._offsets = {}
+ x = 0x44
+ offsets['data'] = x
+ x += data_size
+ offsets['plaintext_hash_tree'] = x
+ x += self._segment_hash_size
+ offsets['crypttext_hash_tree'] = x
+ x += self._segment_hash_size
+ offsets['block_hashes'] = x
+ x += self._segment_hash_size
+ offsets['share_hashes'] = x
+ x += self._share_hash_size
+ offsets['uri_extension'] = x
+
+ if x >= 2**64:
+ raise FileTooLargeError("This file is too large to be uploaded (offsets).")
+
+ offset_data = struct.pack(">LQQQQQQQQ",
+ 2, # version number
+ segment_size,
+ data_size,
+ offsets['data'],
+ offsets['plaintext_hash_tree'],
+ offsets['crypttext_hash_tree'],
+ offsets['block_hashes'],
+ offsets['share_hashes'],
+ offsets['uri_extension'],
+ )
+ assert len(offset_data) == 0x44, len(offset_data)
+ self._offset_data = offset_data
+
class ReadBucketProxy:
implements(IStorageBucketReader)
def __init__(self, rref, peerid=None, storage_index_s=None):
def start(self):
# TODO: for small shares, read the whole bucket in start()
- d = self._read(0, 0x24)
+ d = self._read(0, 0x44)
d.addCallback(self._parse_offsets)
def _started(res):
self._started = True
return d
def _parse_offsets(self, data):
- precondition(len(data) == 0x24)
+ precondition(len(data) >= 0x4)
self._offsets = {}
- (version, self._segment_size, self._data_size) = \
- struct.unpack(">LLL", data[0:0xc])
- _assert(version == 1)
- x = 0x0c
+ (version,) = struct.unpack(">L", data[0:4])
+ _assert(version in (1,2))
+
+ if version == 1:
+ precondition(len(data) >= 0x24)
+ x = 0x0c
+ fieldsize = 0x4
+ fieldstruct = ">L"
+ (self._segment_size,
+ self._data_size) = struct.unpack(">LL", data[0x4:0xc])
+ else:
+ precondition(len(data) >= 0x44)
+ x = 0x14
+ fieldsize = 0x8
+ fieldstruct = ">Q"
+ (self._segment_size,
+ self._data_size) = struct.unpack(">QQ", data[0x4:0x14])
+
+ self._version = version
+ self._fieldsize = fieldsize
+ self._fieldstruct = fieldstruct
+
for field in ( 'data',
'plaintext_hash_tree',
'crypttext_hash_tree',
'share_hashes',
'uri_extension',
):
- offset = struct.unpack(">L", data[x:x+4])[0]
- x += 4
+ offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
+ x += fieldsize
self._offsets[field] = offset
return self._offsets
def get_uri_extension(self):
offset = self._offsets['uri_extension']
- d = self._read(offset, 4)
+ d = self._read(offset, self._fieldsize)
def _got_length(data):
- length = struct.unpack(">L", data)[0]
- return self._read(offset+4, length)
+ length = struct.unpack(self._fieldstruct, data)[0]
+ return self._read(offset+self._fieldsize, length)
d.addCallback(_got_length)
return d
from allmydata.storage import BucketWriter, BucketReader, \
StorageServer, MutableShareFile, \
storage_index_to_dir, DataTooLargeError, LeaseInfo
-from allmydata.immutable.layout import WriteBucketProxy, ReadBucketProxy
+from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
+ ReadBucketProxy
from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent
uri_extension_size=500, nodeid=None)
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
- def test_readwrite(self):
+ def _do_test_readwrite(self, header_size, wbp_class, rbp_class):
# Let's pretend each share has 100 bytes of data, and that there are
# 4 segments (25 bytes each), and 8 shares total. So the three
# per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
# long. That should make the whole share:
#
# 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
+ # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
+
+ sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
for i in range(7)]
for i in (1,9,13)]
uri_extension = "s" + "E"*498 + "e"
- bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
- bp = WriteBucketProxy(rb,
- data_size=95,
- segment_size=25,
- num_segments=4,
- num_share_hashes=3,
- uri_extension_size=len(uri_extension),
- nodeid=None)
+ bw, rb, sharefname = self.make_bucket("test_readwrite", sharesize)
+ bp = wbp_class(rb,
+ data_size=95,
+ segment_size=25,
+ num_segments=4,
+ num_share_hashes=3,
+ uri_extension_size=len(uri_extension),
+ nodeid=None)
d = bp.start()
d.addCallback(lambda res: bp.put_block(0, "a"*25))
br = BucketReader(self, sharefname)
rb = RemoteBucket()
rb.target = br
- rbp = ReadBucketProxy(rb, peerid="abc")
+ rbp = rbp_class(rb, peerid="abc")
self.failUnless("to peer" in repr(rbp))
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
return d
+ def test_readwrite_v1(self):
+ return self._do_test_readwrite(0x24, WriteBucketProxy, ReadBucketProxy)
+ def test_readwrite_v2(self):
+ return self._do_test_readwrite(0x44, WriteBucketProxy_v2, ReadBucketProxy)
class Server(unittest.TestCase):