]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
storage: we must truncate short segments. Now most tests pass (except uri_extension)
authorBrian Warner <warner@lothar.com>
Fri, 13 Jul 2007 23:38:25 +0000 (16:38 -0700)
committerBrian Warner <warner@lothar.com>
Fri, 13 Jul 2007 23:38:25 +0000 (16:38 -0700)
src/allmydata/download.py
src/allmydata/encode.py
src/allmydata/storageserver.py
src/allmydata/test/test_storage.py
src/allmydata/test/test_upload.py

index e048f1d551fe07819cb0114c2f9e694b219d84e6..59649c9c11add140bdfa00666857b1add41a9197 100644 (file)
@@ -119,7 +119,7 @@ class ValidatedBucket:
         if not self._share_hash:
             d1 = self.bucket.get_share_hashes()
         else:
-            d1 = defer.succeed(None)
+            d1 = defer.succeed([])
 
         # we might need to grab some elements of our block hash tree, to
         # validate the requested block up to the share hash
@@ -149,9 +149,12 @@ class ValidatedBucket:
                 sht.set_hashes(sh)
                 self._share_hash = sht.get_leaf(self.sharenum)
 
-            #log.msg("checking block_hash(shareid=%d, blocknum=%d) len=%d" %
-            #        (self.sharenum, blocknum, len(blockdata)))
             blockhash = hashutil.block_hash(blockdata)
+            #log.msg("checking block_hash(shareid=%d, blocknum=%d) len=%d "
+            #        "%r .. %r: %s" %
+            #        (self.sharenum, blocknum, len(blockdata),
+            #         blockdata[:50], blockdata[-50:], idlib.b2a(blockhash)))
+
             # we always validate the blockhash
             bh = dict(enumerate(blockhashes))
             # replace blockhash root with validated value
@@ -163,20 +166,33 @@ class ValidatedBucket:
             # likely a programming error
             log.msg("hash failure in block=%d, shnum=%d on %s" %
                     (blocknum, self.sharenum, self.bucket))
-            #log.msg(" block length: %d" % len(blockdata))
-            #log.msg(" block hash: %s" % idlib.b2a_or_none(blockhash)) # not safe
-            #log.msg(" block data: %r" % (blockdata,))
-            #log.msg(" root hash: %s" % idlib.b2a(self._roothash))
-            #log.msg(" share hash tree:\n" + self.share_hash_tree.dump())
-            #log.msg(" block hash tree:\n" + self.block_hash_tree.dump())
-            #lines = []
-            #for i,h in sorted(sharehashes):
-            #    lines.append("%3d: %s" % (i, idlib.b2a_or_none(h)))
-            #log.msg(" sharehashes:\n" + "\n".join(lines) + "\n")
-            #lines = []
-            #for i,h in enumerate(blockhashes):
-            #    lines.append("%3d: %s" % (i, idlib.b2a_or_none(h)))
-            #log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
+            if self._share_hash:
+                log.msg(""" failure occurred when checking the block_hash_tree.
+                This suggests that either the block data was bad, or that the
+                block hashes we received along with it were bad.""")
+            else:
+                log.msg(""" the failure probably occurred when checking the
+                share_hash_tree, which suggests that the share hashes we
+                received from the remote peer were bad.""")
+            log.msg(" have self._share_hash: %s" % bool(self._share_hash))
+            log.msg(" block length: %d" % len(blockdata))
+            log.msg(" block hash: %s" % idlib.b2a_or_none(blockhash)) # not safe
+            if len(blockdata) < 100:
+                log.msg(" block data: %r" % (blockdata,))
+            else:
+                log.msg(" block data start/end: %r .. %r" %
+                        (blockdata[:50], blockdata[-50:]))
+            log.msg(" root hash: %s" % idlib.b2a(self._roothash))
+            log.msg(" share hash tree:\n" + self.share_hash_tree.dump())
+            log.msg(" block hash tree:\n" + self.block_hash_tree.dump())
+            lines = []
+            for i,h in sorted(sharehashes):
+                lines.append("%3d: %s" % (i, idlib.b2a_or_none(h)))
+            log.msg(" sharehashes:\n" + "\n".join(lines) + "\n")
+            lines = []
+            for i,h in enumerate(blockhashes):
+                lines.append("%3d: %s" % (i, idlib.b2a_or_none(h)))
+            log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
             raise
 
         # If we made it here, the block is good. If the hash trees didn't
index f3af448b9c42d104dd796b70154d0fd5715e19ad..3f643964c265a7c1f57f7223830766c9217bdfd0 100644 (file)
@@ -302,6 +302,11 @@ class Encoder(object):
             d = self.send_subshare(shareid, segnum, subshare)
             dl.append(d)
             subshare_hash = hashutil.block_hash(subshare)
+            #from allmydata.util import idlib
+            #log.msg("creating block (shareid=%d, blocknum=%d) "
+            #        "len=%d %r .. %r: %s" %
+            #        (shareid, segnum, len(subshare),
+            #         subshare[:50], subshare[-50:], idlib.b2a(subshare_hash)))
             self.subshare_hashes[shareid].append(subshare_hash)
         dl = self._gather_responses(dl)
         def _logit(res):
index 74b4c894b5853fa2256f05a6895ee675c46683a5..f3e92d151428a0b7c10a006c9a783b5952d4ac3f 100644 (file)
@@ -6,9 +6,8 @@ from twisted.internet import defer
 
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
-     RIBucketReader, IStorageBucketWriter, IStorageBucketReader
-from allmydata import interfaces
-from allmydata.util import fileutil, idlib
+     RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE
+from allmydata.util import fileutil, idlib, mathutil
 from allmydata.util.assertutil import precondition
 
 # store/
@@ -151,22 +150,23 @@ a series of four-byte big-endian offset values, which indicate where each
 section starts. Each offset is measured from the beginning of the file.
 
 0x00: segment size
-0x04: offset of data (=00 00 00 1c)
-0x08: offset of plaintext_hash_tree
-0x0c: offset of crypttext_hash_tree
-0x10: offset of block_hashes
-0x14: offset of share_hashes
-0x18: offset of uri_extension_length + uri_extension
-0x1c: start of data
-      start of plaintext_hash_tree
-      start of crypttext_hash_tree
-      start of block_hashes
-      start of share_hashes
+0x04: data size
+0x08: offset of data (=00 00 00 1c)
+0x0c: offset of plaintext_hash_tree
+0x10: offset of crypttext_hash_tree
+0x14: offset of block_hashes
+0x18: offset of share_hashes
+0x1c: offset of uri_extension_length + uri_extension
+0x20: start of data
+?   : start of plaintext_hash_tree
+?   : start of crypttext_hash_tree
+?   : start of block_hashes
+?   : start of share_hashes
        each share_hash is written as a two-byte (big-endian) hashnum
        followed by the 32-byte SHA-256 hash. We only store the hashes
        necessary to validate the share hash root
-      start of uri_extension_length (four-byte big-endian value)
-      start of uri_extension
+?   : start of uri_extension_length (four-byte big-endian value)
+?   : start of uri_extension
 """
 
 def allocated_size(data_size, num_segments, num_share_hashes,
@@ -181,10 +181,10 @@ class WriteBucketProxy:
     def __init__(self, rref, data_size, segment_size, num_segments,
                  num_share_hashes, uri_extension_size):
         self._rref = rref
+        self._data_size = data_size
         self._segment_size = segment_size
         self._num_segments = num_segments
 
-        HASH_SIZE = interfaces.HASH_SIZE
         self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
         # how many share hashes are included in each share? This will be
         # about ln2(num_shares).
@@ -193,7 +193,7 @@ class WriteBucketProxy:
         self._uri_extension_size = uri_extension_size
 
         offsets = self._offsets = {}
-        x = 0x1c
+        x = 0x20
         offsets['data'] = x
         x += data_size
         offsets['plaintext_hash_tree'] = x
@@ -206,16 +206,17 @@ class WriteBucketProxy:
         x += self._share_hash_size
         offsets['uri_extension'] = x
 
-        offset_data = struct.pack(">LLLLLLL",
+        offset_data = struct.pack(">LLLLLLLL",
                                   segment_size,
+                                  data_size,
                                   offsets['data'],
                                   offsets['plaintext_hash_tree'],
                                   offsets['crypttext_hash_tree'],
                                   offsets['block_hashes'],
                                   offsets['share_hashes'],
-                                  offsets['uri_extension']
+                                  offsets['uri_extension'],
                                   )
-        assert len(offset_data) == 7*4
+        assert len(offset_data) == 8*4
         self._offset_data = offset_data
 
     def start(self):
@@ -229,7 +230,9 @@ class WriteBucketProxy:
             precondition(len(data) == self._segment_size,
                          len(data), self._segment_size)
         else:
-            precondition(len(data) <= self._segment_size,
+            precondition(len(data) == (self._data_size -
+                                       (self._segment_size *
+                                        (self._num_segments - 1))),
                          len(data), self._segment_size)
         return self._write(offset, data)
 
@@ -298,17 +301,19 @@ class ReadBucketProxy:
 
     def start(self):
         # TODO: for small shares, read the whole bucket in start()
-        d = self._read(0, 7*4)
+        d = self._read(0, 8*4)
         self._offsets = {}
         def _got_offsets(data):
             self._segment_size = struct.unpack(">L", data[0:4])[0]
-            x = 4
+            self._data_size = struct.unpack(">L", data[4:8])[0]
+            x = 0x08
             for field in ( 'data',
                            'plaintext_hash_tree',
                            'crypttext_hash_tree',
                            'block_hashes',
                            'share_hashes',
-                           'uri_extension' ):
+                           'uri_extension',
+                           ):
                 offset = struct.unpack(">L", data[x:x+4])[0]
                 x += 4
                 self._offsets[field] = offset
@@ -316,13 +321,20 @@ class ReadBucketProxy:
         return d
 
     def get_block(self, blocknum):
+        num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
+        if blocknum < num_segments-1:
+            size = self._segment_size
+        else:
+            size = self._data_size % self._segment_size
+            if size == 0:
+                size = self._segment_size
         offset = self._offsets['data'] + blocknum * self._segment_size
-        return self._read(offset, self._segment_size)
+        return self._read(offset, size)
 
     def _str2l(self, s):
         """ split string (pulled from storage) into a list of blockids """
-        return [ s[i:i+interfaces.HASH_SIZE]
-                 for i in range(0, len(s), interfaces.HASH_SIZE) ]
+        return [ s[i:i+HASH_SIZE]
+                 for i in range(0, len(s), HASH_SIZE) ]
 
     def get_plaintext_hashes(self):
         offset = self._offsets['plaintext_hash_tree']
@@ -348,7 +360,6 @@ class ReadBucketProxy:
     def get_share_hashes(self):
         offset = self._offsets['share_hashes']
         size = self._offsets['uri_extension'] - offset
-        HASH_SIZE = interfaces.HASH_SIZE
         assert size % (2+HASH_SIZE) == 0
         d = self._read(offset, size)
         def _unpack_share_hashes(data):
index 7d6afd42b255a168ede3ac8de331a8e45143b6d6..01e282a8ca96a88f2bfb82a8a8cbecec6a59c094 100644 (file)
@@ -102,7 +102,7 @@ class BucketProxy(unittest.TestCase):
 
         bw, rb, final = self.make_bucket("test_readwrite", 1406)
         bp = WriteBucketProxy(rb,
-                              data_size=100,
+                              data_size=95,
                               segment_size=25,
                               num_segments=4,
                               num_share_hashes=3,
@@ -112,7 +112,7 @@ class BucketProxy(unittest.TestCase):
         d.addCallback(lambda res: bp.put_block(0, "a"*25))
         d.addCallback(lambda res: bp.put_block(1, "b"*25))
         d.addCallback(lambda res: bp.put_block(2, "c"*25))
-        d.addCallback(lambda res: bp.put_block(3, "d"*25))
+        d.addCallback(lambda res: bp.put_block(3, "d"*20))
         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
@@ -136,7 +136,7 @@ class BucketProxy(unittest.TestCase):
             d1.addCallback(lambda res: rbp.get_block(2))
             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
             d1.addCallback(lambda res: rbp.get_block(3))
-            d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*25))
+            d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
 
             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
             d1.addCallback(lambda res:
index 92a087632fd32402813b0d85e27af9a2d43287a8..eaddffa716a1a2bfbb9b31c158e325281bcd131d 100644 (file)
@@ -4,7 +4,7 @@ from twisted.python.failure import Failure
 from twisted.internet import defer
 from cStringIO import StringIO
 
-from allmydata import upload, encode, storageserver
+from allmydata import upload, encode
 from allmydata.uri import unpack_uri, unpack_lit
 from allmydata.util.assertutil import precondition
 from foolscap import eventual
@@ -35,7 +35,7 @@ class FakeStorageServer:
         return d
 
     def allocate_buckets(self, crypttext_hash, sharenums,
-                         share_size, blocksize, canary):
+                         share_size, canary):
         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
         if self.mode == "full":
             return (set(), {},)