]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
storage: introduce v2 immutable shares, with 8-byte offsets fields, to remove two...
authorBrian Warner <warner@allmydata.com>
Fri, 10 Oct 2008 01:13:27 +0000 (18:13 -0700)
committerBrian Warner <warner@allmydata.com>
Fri, 10 Oct 2008 01:13:27 +0000 (18:13 -0700)
src/allmydata/immutable/layout.py
src/allmydata/test/test_storage.py

index 9f7be655acdc491abc35211441e78bd5ff4410f4..6e38e2d0930b25880ccf6704d44bdb79fd822164 100644 (file)
@@ -34,15 +34,37 @@ section starts. Each offset is measured from the beginning of the file.
 ?   : start of uri_extension
 """
 
+"""
+v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
+limitations described in #346.
+
+0x00: version number (=00 00 00 02)
+0x04: segment size
+0x0c: data size
+0x14: offset of data (=00 00 00 00 00 00 00 44)
+0x1c: offset of plaintext_hash_tree
+0x24: offset of crypttext_hash_tree
+0x2c: offset of block_hashes
+0x34: offset of share_hashes
+0x3c: offset of uri_extension_length + uri_extension
+0x44: start of data
+    : rest of share is the same as v1, above
+...   ...
+?   : start of uri_extension_length (eight-byte big-endian value)
+"""
+
 def allocated_size(data_size, num_segments, num_share_hashes,
                    uri_extension_size):
     wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
                            uri_extension_size, None)
     uri_extension_starts_at = wbp._offsets['uri_extension']
-    return uri_extension_starts_at + 4 + uri_extension_size
+    return uri_extension_starts_at + wbp.fieldsize + uri_extension_size
 
 class WriteBucketProxy:
     implements(IStorageBucketWriter)
+    fieldsize = 4
+    fieldstruct = ">L"
+
     def __init__(self, rref, data_size, segment_size, num_segments,
                  num_share_hashes, uri_extension_size, nodeid):
         self._rref = rref
@@ -51,9 +73,6 @@ class WriteBucketProxy:
         self._num_segments = num_segments
         self._nodeid = nodeid
 
-        if segment_size >= 2**32 or data_size >= 2**32:
-            raise FileTooLargeError("This file is too large to be uploaded (data_size).")
-
         effective_segments = mathutil.next_power_of_k(num_segments,2)
         self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
         # how many share hashes are included in each share? This will be
@@ -62,6 +81,12 @@ class WriteBucketProxy:
         # we commit to not sending a uri extension larger than this
         self._uri_extension_size = uri_extension_size
 
+        self._create_offsets(segment_size, data_size)
+
+    def _create_offsets(self, segment_size, data_size):
+        if segment_size >= 2**32 or data_size >= 2**32:
+            raise FileTooLargeError("This file is too large to be uploaded (data_size).")
+
         offsets = self._offsets = {}
         x = 0x24
         offsets['data'] = x
@@ -169,7 +194,7 @@ class WriteBucketProxy:
         assert isinstance(data, str)
         precondition(len(data) <= self._uri_extension_size,
                      len(data), self._uri_extension_size)
-        length = struct.pack(">L", len(data))
+        length = struct.pack(self.fieldstruct, len(data))
         return self._write(offset, length+data)
 
     def _write(self, offset, data):
@@ -182,6 +207,45 @@ class WriteBucketProxy:
     def abort(self):
         return self._rref.callRemoteOnly("abort")
 
+class WriteBucketProxy_v2(WriteBucketProxy):
+    fieldsize = 8
+    fieldstruct = ">Q"
+
+    def _create_offsets(self, segment_size, data_size):
+        if segment_size >= 2**64 or data_size >= 2**64:
+            raise FileTooLargeError("This file is too large to be uploaded (data_size).")
+
+        offsets = self._offsets = {}
+        x = 0x44
+        offsets['data'] = x
+        x += data_size
+        offsets['plaintext_hash_tree'] = x
+        x += self._segment_hash_size
+        offsets['crypttext_hash_tree'] = x
+        x += self._segment_hash_size
+        offsets['block_hashes'] = x
+        x += self._segment_hash_size
+        offsets['share_hashes'] = x
+        x += self._share_hash_size
+        offsets['uri_extension'] = x
+
+        if x >= 2**64:
+            raise FileTooLargeError("This file is too large to be uploaded (offsets).")
+
+        offset_data = struct.pack(">LQQQQQQQQ",
+                                  2, # version number
+                                  segment_size,
+                                  data_size,
+                                  offsets['data'],
+                                  offsets['plaintext_hash_tree'],
+                                  offsets['crypttext_hash_tree'],
+                                  offsets['block_hashes'],
+                                  offsets['share_hashes'],
+                                  offsets['uri_extension'],
+                                  )
+        assert len(offset_data) == 0x44, len(offset_data)
+        self._offset_data = offset_data
+
 class ReadBucketProxy:
     implements(IStorageBucketReader)
     def __init__(self, rref, peerid=None, storage_index_s=None):
@@ -207,7 +271,7 @@ class ReadBucketProxy:
 
     def start(self):
         # TODO: for small shares, read the whole bucket in start()
-        d = self._read(0, 0x24)
+        d = self._read(0, 0x44)
         d.addCallback(self._parse_offsets)
         def _started(res):
             self._started = True
@@ -216,12 +280,30 @@ class ReadBucketProxy:
         return d
 
     def _parse_offsets(self, data):
-        precondition(len(data) == 0x24)
+        precondition(len(data) >= 0x4)
         self._offsets = {}
-        (version, self._segment_size, self._data_size) = \
-                  struct.unpack(">LLL", data[0:0xc])
-        _assert(version == 1)
-        x = 0x0c
+        (version,) = struct.unpack(">L", data[0:4])
+        _assert(version in (1,2))
+
+        if version == 1:
+            precondition(len(data) >= 0x24)
+            x = 0x0c
+            fieldsize = 0x4
+            fieldstruct = ">L"
+            (self._segment_size,
+             self._data_size) = struct.unpack(">LL", data[0x4:0xc])
+        else:
+            precondition(len(data) >= 0x44)
+            x = 0x14
+            fieldsize = 0x8
+            fieldstruct = ">Q"
+            (self._segment_size,
+             self._data_size) = struct.unpack(">QQ", data[0x4:0x14])
+
+        self._version = version
+        self._fieldsize = fieldsize
+        self._fieldstruct = fieldstruct
+
         for field in ( 'data',
                        'plaintext_hash_tree',
                        'crypttext_hash_tree',
@@ -229,8 +311,8 @@ class ReadBucketProxy:
                        'share_hashes',
                        'uri_extension',
                        ):
-            offset = struct.unpack(">L", data[x:x+4])[0]
-            x += 4
+            offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
+            x += fieldsize
             self._offsets[field] = offset
         return self._offsets
 
@@ -289,10 +371,10 @@ class ReadBucketProxy:
 
     def get_uri_extension(self):
         offset = self._offsets['uri_extension']
-        d = self._read(offset, 4)
+        d = self._read(offset, self._fieldsize)
         def _got_length(data):
-            length = struct.unpack(">L", data)[0]
-            return self._read(offset+4, length)
+            length = struct.unpack(self._fieldstruct, data)[0]
+            return self._read(offset+self._fieldsize, length)
         d.addCallback(_got_length)
         return d
 
index 70db7ab738d5acd30b3ef86d013ae43b8f4fb43a..edb9ff94eee534f639398e853d2d12d3e0244a48 100644 (file)
@@ -9,7 +9,8 @@ from allmydata.util import fileutil, hashutil
 from allmydata.storage import BucketWriter, BucketReader, \
      StorageServer, MutableShareFile, \
      storage_index_to_dir, DataTooLargeError, LeaseInfo
-from allmydata.immutable.layout import WriteBucketProxy, ReadBucketProxy
+from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
+     ReadBucketProxy
 from allmydata.interfaces import BadWriteEnablerError
 from allmydata.test.common import LoggingServiceParent
 
@@ -131,7 +132,7 @@ class BucketProxy(unittest.TestCase):
                               uri_extension_size=500, nodeid=None)
         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
 
-    def test_readwrite(self):
+    def _do_test_readwrite(self, header_size, wbp_class, rbp_class):
         # Let's pretend each share has 100 bytes of data, and that there are
         # 4 segments (25 bytes each), and 8 shares total. So the three
         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
@@ -141,6 +142,9 @@ class BucketProxy(unittest.TestCase):
         # long. That should make the whole share:
         #
         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
+        # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
+
+        sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
 
         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
                             for i in range(7)]
@@ -152,14 +156,14 @@ class BucketProxy(unittest.TestCase):
                         for i in (1,9,13)]
         uri_extension = "s" + "E"*498 + "e"
 
-        bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
-        bp = WriteBucketProxy(rb,
-                              data_size=95,
-                              segment_size=25,
-                              num_segments=4,
-                              num_share_hashes=3,
-                              uri_extension_size=len(uri_extension),
-                              nodeid=None)
+        bw, rb, sharefname = self.make_bucket("test_readwrite", sharesize)
+        bp = wbp_class(rb,
+                       data_size=95,
+                       segment_size=25,
+                       num_segments=4,
+                       num_share_hashes=3,
+                       uri_extension_size=len(uri_extension),
+                       nodeid=None)
 
         d = bp.start()
         d.addCallback(lambda res: bp.put_block(0, "a"*25))
@@ -178,7 +182,7 @@ class BucketProxy(unittest.TestCase):
             br = BucketReader(self, sharefname)
             rb = RemoteBucket()
             rb.target = br
-            rbp = ReadBucketProxy(rb, peerid="abc")
+            rbp = rbp_class(rb, peerid="abc")
             self.failUnless("to peer" in repr(rbp))
             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
 
@@ -213,7 +217,11 @@ class BucketProxy(unittest.TestCase):
 
         return d
 
+    def test_readwrite_v1(self):
+        return self._do_test_readwrite(0x24, WriteBucketProxy, ReadBucketProxy)
 
+    def test_readwrite_v2(self):
+        return self._do_test_readwrite(0x44, WriteBucketProxy_v2, ReadBucketProxy)
 
 class Server(unittest.TestCase):