]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
immutable: refactor downloader to be more reusable for checker/verifier/repairer...
authorZooko O'Whielacronx <zooko@zooko.com>
Mon, 5 Jan 2009 16:51:45 +0000 (09:51 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Mon, 5 Jan 2009 16:51:45 +0000 (09:51 -0700)
The code for validating the share hash tree and the block hash tree has been rewritten to make sure it handles all cases, to share metadata about the file (such as the share hash tree, block hash trees, and UEB) among different share downloads, and not to require hashes to be stored on the server unnecessarily, such as the roots of the block hash trees (not needed since they are also the leaves of the share hash tree), and the root of the share hash tree (not needed since it is also included in the UEB).  It also passes the latest tests including handling corrupted shares well.

ValidatedReadBucketProxy takes a share_hash_tree argument to its constructor, which is a reference to a share hash tree shared by all ValidatedReadBucketProxies for that immutable file download.

ValidatedReadBucketProxy requires the block_size and share_size to be provided in its constructor, and it then uses those to compute the offsets and lengths of blocks when it needs them, instead of reading those values out of the share.  The user of ValidatedReadBucketProxy therefore has to have first used a ValidatedExtendedURIProxy to compute those two values from the validated contents of the URI.  This is pleasingly simplifies safety analysis: the client knows which span of bytes corresponds to a given block from the validated URI data, rather than from the unvalidated data stored on the storage server.  It also simplifies unit testing of verifier/repairer, because now it doesn't care about the contents of the "share size" and "block size" fields in the share.  It does not relieve the need for share data v2 layout, because we still need to store and retrieve the offsets of the fields which come after the share data, therefore we still need to use share data v2 with its 8-byte fields if we want to store share data larger than about 2^32.

Specify which subset of the block hashes and share hashes you need while downloading a particular share.  In the future this will hopefully be used to fetch only a subset, for network efficiency, but currently all of them are fetched, regardless of which subset you specify.

ReadBucketProxy hides the question of whether it has "started" or not (sent a request to the server to get metadata) from its user.

Download is optimized to do as few roundtrips and as few requests as possible, hopefully speeding up download a bit.

src/allmydata/immutable/download.py
src/allmydata/immutable/encode.py
src/allmydata/immutable/layout.py
src/allmydata/interfaces.py
src/allmydata/offloaded.py
src/allmydata/scripts/debug.py
src/allmydata/test/test_encode.py
src/allmydata/test/test_hashtree.py
src/allmydata/test/test_immutable.py
src/allmydata/test/test_storage.py

index 4e84bdc5e257159ec176105111400de5f9ca535e..595818ecee53ccb32bf78ab986c0caa0f137757b 100644 (file)
@@ -7,7 +7,7 @@ from twisted.application import service
 from foolscap import DeadReferenceError
 from foolscap.eventual import eventually
 
-from allmydata.util import base32, mathutil, hashutil, log
+from allmydata.util import base32, deferredutil, mathutil, hashutil, log
 from allmydata.util.assertutil import _assert, precondition
 from allmydata.util.rrefutil import ServerFailure
 from allmydata import codec, hashtree, uri
@@ -134,7 +134,8 @@ class ValidatedThingObtainer:
         if not self._validatedthingproxies:
             raise NotEnoughSharesError("ran out of peers, last error was %s" % (f,))
         # try again with a different one
-        return self._try_the_next_one()
+        d = self._try_the_next_one()
+        return d
 
     def _try_the_next_one(self):
         vtp = self._validatedthingproxies.pop(0)
@@ -167,8 +168,7 @@ class ValidatedCrypttextHashTreeProxy:
         return self
 
     def start(self):
-        d = self._readbucketproxy.startIfNecessary()
-        d.addCallback(lambda ignored: self._readbucketproxy.get_crypttext_hashes())
+        d = self._readbucketproxy.get_crypttext_hashes()
         d.addCallback(self._validate)
         return d
 
@@ -333,97 +333,104 @@ class ValidatedExtendedURIProxy:
         """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
         it.  Returns a deferred which is called back with self once the fetch is successful, or
         is erred back if it fails. """
-        d = self._readbucketproxy.startIfNecessary()
-        d.addCallback(lambda ignored: self._readbucketproxy.get_uri_extension())
+        d = self._readbucketproxy.get_uri_extension()
         d.addCallback(self._check_integrity)
         d.addCallback(self._parse_and_validate)
         return d
 
 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
-    """I am a front-end for a remote storage bucket, responsible for
-    retrieving and validating data from that bucket.
+    """I am a front-end for a remote storage bucket, responsible for retrieving and validating
+    data from that bucket.
 
     My get_block() method is used by BlockDownloaders.
     """
 
-    def __init__(self, sharenum, bucket,
-                 share_hash_tree, share_root_hash,
-                 num_blocks):
-        """ share_root_hash is the root of the share hash tree; share_root_hash is stored in the UEB """
+    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
+        """ share_hash_tree is required to have already been initialized with the root hash
+        (the number-0 hash), using the share_root_hash from the UEB """
+        precondition(share_hash_tree[0] is not None, share_hash_tree)
         prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
         self.sharenum = sharenum
         self.bucket = bucket
-        self._share_hash = None # None means not validated yet
         self.share_hash_tree = share_hash_tree
-        self._share_root_hash = share_root_hash
-        self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks)
-        self.started = False
+        self.num_blocks = num_blocks
+        self.block_size = block_size
+        self.share_size = share_size
+        self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
 
     def get_block(self, blocknum):
-        if not self.started:
-            d = self.bucket.start()
-            def _started(res):
-                self.started = True
-                return self.get_block(blocknum)
-            d.addCallback(_started)
-            return d
-
         # the first time we use this bucket, we need to fetch enough elements
         # of the share hash tree to validate it from our share hash up to the
         # hashroot.
-        if not self._share_hash:
+        if self.share_hash_tree.needed_hashes(self.sharenum):
             d1 = self.bucket.get_share_hashes()
         else:
             d1 = defer.succeed([])
 
-        # we might need to grab some elements of our block hash tree, to
-        # validate the requested block up to the share hash
-        needed = self.block_hash_tree.needed_hashes(blocknum)
-        if needed:
-            # TODO: get fewer hashes, use get_block_hashes(needed)
-            d2 = self.bucket.get_block_hashes()
-        else:
-            d2 = defer.succeed([])
-
-        d3 = self.bucket.get_block(blocknum)
+        # We might need to grab some elements of our block hash tree, to
+        # validate the requested block up to the share hash.
+        blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
+        # We don't need the root of the block hash tree, as that comes in the share tree.
+        blockhashesneeded.discard(0)
+        d2 = self.bucket.get_block_hashes(blockhashesneeded)
 
-        d = defer.gatherResults([d1, d2, d3])
-        d.addCallback(self._got_data, blocknum)
-        return d
+        if blocknum < self.num_blocks-1:
+            thisblocksize = self.block_size
+        else:
+            thisblocksize = self.share_size % self.block_size
+            if thisblocksize == 0:
+                thisblocksize = self.block_size
+        d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
+
+        dl = deferredutil.gatherResults([d1, d2, d3])
+        dl.addCallback(self._got_data, blocknum)
+        return dl
+
+    def _got_data(self, results, blocknum):
+        precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
+        sharehashes, blockhashes, blockdata = results
+        try:
+            sharehashes = dict(sharehashes)
+        except ValueError, le:
+            le.args = tuple(le.args + (sharehashes,))
+            raise
+        blockhashes = dict(enumerate(blockhashes))
 
-    def _got_data(self, res, blocknum):
-        sharehashes, blockhashes, blockdata = res
-        blockhash = None # to make logging it safe
+        candidate_share_hash = None # in case we log it in the except block below
+        blockhash = None # in case we log it in the except block below
 
         try:
-            if not self._share_hash:
-                sh = dict(sharehashes)
-                sh[0] = self._share_root_hash # always use our own root, from the URI
-                sht = self.share_hash_tree
-                if sht.get_leaf_index(self.sharenum) not in sh:
-                    raise hashtree.NotEnoughHashesError
-                sht.set_hashes(sh)
-                self._share_hash = sht.get_leaf(self.sharenum)
+            if self.share_hash_tree.needed_hashes(self.sharenum):
+                # This will raise exception if the values being passed do not match the root
+                # node of self.share_hash_tree.
+                self.share_hash_tree.set_hashes(sharehashes)
+
+            # To validate a block we need the root of the block hash tree, which is also one of
+            # the leafs of the share hash tree, and is called "the share hash".
+            if not self.block_hash_tree[0]: # empty -- no root node yet
+                # Get the share hash from the share hash tree.
+                share_hash = self.share_hash_tree.get_leaf(self.sharenum)
+                if not share_hash:
+                    raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
+                self.block_hash_tree.set_hashes({0: share_hash})
+
+            if self.block_hash_tree.needed_hashes(blocknum):
+                self.block_hash_tree.set_hashes(blockhashes)
 
             blockhash = hashutil.block_hash(blockdata)
+            self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
             #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
             #        "%r .. %r: %s" %
             #        (self.sharenum, blocknum, len(blockdata),
             #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
 
-            # we always validate the blockhash
-            bh = dict(enumerate(blockhashes))
-            # replace blockhash root with validated value
-            bh[0] = self._share_hash
-            self.block_hash_tree.set_hashes(bh, {blocknum: blockhash})
-
         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
             # log.WEIRD: indicates undetected disk/network error, or more
             # likely a programming error
             self.log("hash failure in block=%d, shnum=%d on %s" %
                     (blocknum, self.sharenum, self.bucket))
-            if self._share_hash:
+            if self.block_hash_tree.needed_hashes(blocknum):
                 self.log(""" failure occurred when checking the block_hash_tree.
                 This suggests that either the block data was bad, or that the
                 block hashes we received along with it were bad.""")
@@ -431,7 +438,7 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
                 self.log(""" the failure probably occurred when checking the
                 share_hash_tree, which suggests that the share hashes we
                 received from the remote peer were bad.""")
-            self.log(" have self._share_hash: %s" % bool(self._share_hash))
+            self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
             self.log(" block length: %d" % len(blockdata))
             self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
             if len(blockdata) < 100:
@@ -439,15 +446,14 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
             else:
                 self.log(" block data start/end: %r .. %r" %
                         (blockdata[:50], blockdata[-50:]))
-            self.log(" root hash: %s" % base32.b2a(self._share_root_hash))
             self.log(" share hash tree:\n" + self.share_hash_tree.dump())
             self.log(" block hash tree:\n" + self.block_hash_tree.dump())
             lines = []
-            for i,h in sorted(sharehashes):
+            for i,h in sorted(sharehashes.items()):
                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
             self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
             lines = []
-            for i,h in enumerate(blockhashes):
+            for i,h in blockhashes.items():
                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
             log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
             raise BadOrMissingHash(le)
@@ -495,7 +501,7 @@ class BlockDownloader(log.PrefixingLogMixin):
         self.parent.hold_block(self.blocknum, data)
 
     def _got_block_error(self, f):
-        failtype = f.trap(ServerFailure, IntegrityCheckReject, layout.LayoutInvalid)
+        failtype = f.trap(ServerFailure, IntegrityCheckReject, layout.LayoutInvalid, layout.ShareVersionIncompatible)
         if f.check(ServerFailure):
             level = log.UNUSUAL
         else:
@@ -679,6 +685,7 @@ class FileDownloader(log.PrefixingLogMixin):
 
         self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
 
+        self._share_hash_tree = None
         self._crypttext_hash_tree = None
 
     def pauseProducing(self):
@@ -840,8 +847,8 @@ class FileDownloader(log.PrefixingLogMixin):
 
             self._current_segnum = 0
 
-            self._share_hashtree = hashtree.IncompleteHashTree(self._uri.total_shares)
-            self._share_hashtree.set_hashes({0: vup.share_root_hash})
+            self._share_hash_tree = hashtree.IncompleteHashTree(self._uri.total_shares)
+            self._share_hash_tree.set_hashes({0: vup.share_root_hash})
 
             self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
             self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
@@ -897,12 +904,8 @@ class FileDownloader(log.PrefixingLogMixin):
 
     def _download_all_segments(self, res):
         for sharenum, bucket in self._share_buckets:
-            vbucket = ValidatedReadBucketProxy(sharenum, bucket,
-                                      self._share_hashtree,
-                                      self._vup.share_root_hash,
-                                      self._vup.num_segments)
-            s = self._share_vbuckets.setdefault(sharenum, set())
-            s.add(vbucket)
+            vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
+            self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
 
         # after the above code, self._share_vbuckets contains enough
         # buckets to complete the download, and some extra ones to
index 0945ded98b57cd6056bbafe8059d49a80ebdf3a6..7bc4ce0f621d80acca48c11792312fad8c5c21df 100644 (file)
@@ -283,7 +283,7 @@ class Encoder(object):
         self.set_status("Starting shareholders")
         dl = []
         for shareid in self.landlords:
-            d = self.landlords[shareid].start()
+            d = self.landlords[shareid].put_header()
             d.addErrback(self._remove_shareholder, shareid, "start")
             dl.append(d)
         return self._gather_responses(dl)
index 1f5de5ab0db3d360570892d0f192595f2e1f54fd..d1c2da45c66ccf06978f4832a46ca6d39d8bb5b8 100644 (file)
@@ -3,7 +3,7 @@ from zope.interface import implements
 from twisted.internet import defer
 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
      FileTooLargeError, HASH_SIZE
-from allmydata.util import mathutil, idlib
+from allmydata.util import log, mathutil, idlib, observer
 from allmydata.util.assertutil import precondition
 from allmydata import storage
 
@@ -29,8 +29,8 @@ big-endian offset values, which indicate where each section starts. Each offset
 the beginning of the share data.
 
 0x00: version number (=00 00 00 01)
-0x04: segment size
-0x08: data size
+0x04: block size # See Footnote 1 below.
+0x08: share data size # See Footnote 1 below.
 0x0c: offset of data (=00 00 00 24)
 0x10: offset of plaintext_hash_tree UNUSED
 0x14: offset of crypttext_hash_tree
@@ -43,7 +43,7 @@ the beginning of the share data.
 ?   : start of block_hashes
 ?   : start of share_hashes
        each share_hash is written as a two-byte (big-endian) hashnum
-       followed by the 32-byte SHA-256 hash. We only store the hashes
+       followed by the 32-byte SHA-256 hash. We store only the hashes
        necessary to validate the share hash root
 ?   : start of uri_extension_length (four-byte big-endian value)
 ?   : start of uri_extension
@@ -54,8 +54,8 @@ v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
 limitations described in #346.
 
 0x00: version number (=00 00 00 02)
-0x04: segment size
-0x0c: data size
+0x04: block size # See Footnote 1 below.
+0x0c: share data size # See Footnote 1 below.
 0x14: offset of data (=00 00 00 00 00 00 00 44)
 0x1c: offset of plaintext_hash_tree UNUSED
 0x24: offset of crypttext_hash_tree
@@ -68,23 +68,26 @@ limitations described in #346.
 ?   : start of uri_extension_length (eight-byte big-endian value)
 """
 
+# Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but they are still
+# provided when writing so that older versions of Tahoe can read them.
+
 def allocated_size(data_size, num_segments, num_share_hashes,
-                   uri_extension_size):
+                   uri_extension_size_max):
     wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
-                           uri_extension_size, None)
+                           uri_extension_size_max, None)
     uri_extension_starts_at = wbp._offsets['uri_extension']
-    return uri_extension_starts_at + wbp.fieldsize + uri_extension_size
+    return uri_extension_starts_at + wbp.fieldsize + uri_extension_size_max
 
 class WriteBucketProxy:
     implements(IStorageBucketWriter)
     fieldsize = 4
     fieldstruct = ">L"
 
-    def __init__(self, rref, data_size, segment_size, num_segments,
-                 num_share_hashes, uri_extension_size, nodeid):
+    def __init__(self, rref, data_size, block_size, num_segments,
+                 num_share_hashes, uri_extension_size_max, nodeid):
         self._rref = rref
         self._data_size = data_size
-        self._segment_size = segment_size
+        self._block_size = block_size
         self._num_segments = num_segments
         self._nodeid = nodeid
 
@@ -92,14 +95,14 @@ class WriteBucketProxy:
         self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
         # how many share hashes are included in each share? This will be
         # about ln2(num_shares).
-        self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
+        self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
         # we commit to not sending a uri extension larger than this
-        self._uri_extension_size = uri_extension_size
+        self._uri_extension_size_max = uri_extension_size_max
 
-        self._create_offsets(segment_size, data_size)
+        self._create_offsets(block_size, data_size)
 
-    def _create_offsets(self, segment_size, data_size):
-        if segment_size >= 2**32 or data_size >= 2**32:
+    def _create_offsets(self, block_size, data_size):
+        if block_size >= 2**32 or data_size >= 2**32:
             raise FileTooLargeError("This file is too large to be uploaded (data_size).")
 
         offsets = self._offsets = {}
@@ -113,7 +116,7 @@ class WriteBucketProxy:
         offsets['block_hashes'] = x
         x += self._segment_hash_size
         offsets['share_hashes'] = x
-        x += self._share_hash_size
+        x += self._share_hashtree_size
         offsets['uri_extension'] = x
 
         if x >= 2**32:
@@ -121,7 +124,7 @@ class WriteBucketProxy:
 
         offset_data = struct.pack(">LLLLLLLLL",
                                   1, # version number
-                                  segment_size,
+                                  block_size,
                                   data_size,
                                   offsets['data'],
                                   offsets['plaintext_hash_tree'], # UNUSED
@@ -140,21 +143,21 @@ class WriteBucketProxy:
             nodeid_s = "[None]"
         return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
 
-    def start(self):
+    def put_header(self):
         return self._write(0, self._offset_data)
 
     def put_block(self, segmentnum, data):
-        offset = self._offsets['data'] + segmentnum * self._segment_size
+        offset = self._offsets['data'] + segmentnum * self._block_size
         assert offset + len(data) <= self._offsets['uri_extension']
         assert isinstance(data, str)
         if segmentnum < self._num_segments-1:
-            precondition(len(data) == self._segment_size,
-                         len(data), self._segment_size)
+            precondition(len(data) == self._block_size,
+                         len(data), self._block_size)
         else:
             precondition(len(data) == (self._data_size -
-                                       (self._segment_size *
+                                       (self._block_size *
                                         (self._num_segments - 1))),
-                         len(data), self._segment_size)
+                         len(data), self._block_size)
         return self._write(offset, data)
 
     def put_crypttext_hashes(self, hashes):
@@ -186,8 +189,8 @@ class WriteBucketProxy:
         assert isinstance(sharehashes, list)
         data = "".join([struct.pack(">H", hashnum) + hashvalue
                         for hashnum,hashvalue in sharehashes])
-        precondition(len(data) == self._share_hash_size,
-                     len(data), self._share_hash_size)
+        precondition(len(data) == self._share_hashtree_size,
+                     len(data), self._share_hashtree_size)
         precondition(offset + len(data) <= self._offsets['uri_extension'],
                      offset, len(data), offset+len(data),
                      self._offsets['uri_extension'])
@@ -196,8 +199,8 @@ class WriteBucketProxy:
     def put_uri_extension(self, data):
         offset = self._offsets['uri_extension']
         assert isinstance(data, str)
-        precondition(len(data) <= self._uri_extension_size,
-                     len(data), self._uri_extension_size)
+        precondition(len(data) <= self._uri_extension_size_max,
+                     len(data), self._uri_extension_size_max)
         length = struct.pack(self.fieldstruct, len(data))
         return self._write(offset, length+data)
 
@@ -215,8 +218,8 @@ class WriteBucketProxy_v2(WriteBucketProxy):
     fieldsize = 8
     fieldstruct = ">Q"
 
-    def _create_offsets(self, segment_size, data_size):
-        if segment_size >= 2**64 or data_size >= 2**64:
+    def _create_offsets(self, block_size, data_size):
+        if block_size >= 2**64 or data_size >= 2**64:
             raise FileTooLargeError("This file is too large to be uploaded (data_size).")
 
         offsets = self._offsets = {}
@@ -230,7 +233,7 @@ class WriteBucketProxy_v2(WriteBucketProxy):
         offsets['block_hashes'] = x
         x += self._segment_hash_size
         offsets['share_hashes'] = x
-        x += self._share_hash_size
+        x += self._share_hashtree_size
         offsets['uri_extension'] = x
 
         if x >= 2**64:
@@ -238,7 +241,7 @@ class WriteBucketProxy_v2(WriteBucketProxy):
 
         offset_data = struct.pack(">LQQQQQQQQ",
                                   2, # version number
-                                  segment_size,
+                                  block_size,
                                   data_size,
                                   offsets['data'],
                                   offsets['plaintext_hash_tree'], # UNUSED
@@ -252,13 +255,17 @@ class WriteBucketProxy_v2(WriteBucketProxy):
 
 class ReadBucketProxy:
     implements(IStorageBucketReader)
+
+    MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
+
     def __init__(self, rref, peerid, storage_index):
         self._rref = rref
         self._peerid = peerid
         peer_id_s = idlib.shortnodeid_b2a(peerid)
         storage_index_s = storage.si_b2a(storage_index)
-        self._reprstr = "<ReadBucketProxy to peer [%s] SI %s>" % (peer_id_s, storage_index_s)
-        self._started = False
+        self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
+        self._started = False # sent request to server
+        self._ready = observer.OneShotObserverList() # got response from server
 
     def get_peerid(self):
         return self._peerid
@@ -266,23 +273,28 @@ class ReadBucketProxy:
     def __repr__(self):
         return self._reprstr
 
-    def startIfNecessary(self):
-        if self._started:
-            return defer.succeed(self)
-        d = self.start()
-        d.addCallback(lambda res: self)
-        return d
-
-    def start(self):
-        # TODO: for small shares, read the whole bucket in start()
-        d = self._read(0, 0x44)
+    def _start_if_needed(self):
+        """ Returns a deferred that will be fired when I'm ready to return data, or errbacks if
+        the starting (header reading and parsing) process fails."""
+        if not self._started:
+            self._start()
+        return self._ready.when_fired()
+
+    def _start(self):
+        self._started = True
+        # TODO: for small shares, read the whole bucket in _start()
+        d = self._fetch_header()
         d.addCallback(self._parse_offsets)
-        def _started(res):
-            self._started = True
-            return res
-        d.addCallback(_started)
+        d.addCallback(self._fetch_sharehashtree_and_ueb)
+        d.addCallback(self._parse_sharehashtree_and_ueb)
+        def _fail_waiters(f):
+            self._ready.fire(f)
+        d.addErrback(_fail_waiters)
         return d
 
+    def _fetch_header(self):
+        return self._read(0, 0x44)
+
     def _parse_offsets(self, data):
         precondition(len(data) >= 0x4)
         self._offsets = {}
@@ -295,15 +307,11 @@ class ReadBucketProxy:
             x = 0x0c
             fieldsize = 0x4
             fieldstruct = ">L"
-            (self._segment_size,
-             self._data_size) = struct.unpack(">LL", data[0x4:0xc])
         else:
             precondition(len(data) >= 0x44)
             x = 0x14
             fieldsize = 0x8
             fieldstruct = ">Q"
-            (self._segment_size,
-             self._data_size) = struct.unpack(">QQ", data[0x4:0x14])
 
         self._version = version
         self._fieldsize = fieldsize
@@ -321,67 +329,86 @@ class ReadBucketProxy:
             self._offsets[field] = offset
         return self._offsets
 
-    def get_block(self, blocknum):
-        num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
-        if blocknum < num_segments-1:
-            size = self._segment_size
-        else:
-            size = self._data_size % self._segment_size
-            if size == 0:
-                size = self._segment_size
-        offset = self._offsets['data'] + blocknum * self._segment_size
-        return self._read(offset, size)
+    def _fetch_sharehashtree_and_ueb(self, offsets):
+        sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
+        return self._read(offsets['share_hashes'], self.MAX_UEB_SIZE+sharehashtree_size)
+
+    def _parse_sharehashtree_and_ueb(self, data):
+        sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
+        if len(data) < sharehashtree_size:
+            raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
+        if sharehashtree_size % (2+HASH_SIZE) != 0:
+            raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
+        self._share_hashes = []
+        for i in range(0, sharehashtree_size, 2+HASH_SIZE):
+            hashnum = struct.unpack(">H", data[i:i+2])[0]
+            hashvalue = data[i+2:i+2+HASH_SIZE]
+            self._share_hashes.append( (hashnum, hashvalue) )
+
+        i = self._offsets['uri_extension']-self._offsets['share_hashes']
+        if len(data) < i+self._fieldsize:
+            raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
+        length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
+        self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
+
+        self._ready.fire(self)
+
+    def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
+        offset = self._offsets['data'] + blocknum * blocksize
+        return self._read(offset, thisblocksize)
+
+    def get_block_data(self, blocknum, blocksize, thisblocksize):
+        d = self._start_if_needed()
+        d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize)
+        return d
 
     def _str2l(self, s):
         """ split string (pulled from storage) into a list of blockids """
         return [ s[i:i+HASH_SIZE]
                  for i in range(0, len(s), HASH_SIZE) ]
 
-    def get_crypttext_hashes(self):
+    def _get_crypttext_hashes(self, unused=None):
         offset = self._offsets['crypttext_hash_tree']
         size = self._offsets['block_hashes'] - offset
         d = self._read(offset, size)
         d.addCallback(self._str2l)
         return d
 
-    def get_block_hashes(self):
+    def get_crypttext_hashes(self):
+        d = self._start_if_needed()
+        d.addCallback(self._get_crypttext_hashes)
+        return d
+
+    def _get_block_hashes(self, unused=None, at_least_these=()):
+        # TODO: fetch only at_least_these instead of all of them.
         offset = self._offsets['block_hashes']
         size = self._offsets['share_hashes'] - offset
         d = self._read(offset, size)
         d.addCallback(self._str2l)
         return d
 
+    def get_block_hashes(self, at_least_these=()):
+        if at_least_these:
+            d = self._start_if_needed()
+            d.addCallback(self._get_block_hashes, at_least_these)
+            return d
+        else:
+            return defer.succeed([])
+
+    def _get_share_hashes(self, unused=None):
+        return self._share_hashes
+
     def get_share_hashes(self):
-        offset = self._offsets['share_hashes']
-        size = self._offsets['uri_extension'] - offset
-        assert size % (2+HASH_SIZE) == 0
-        d = self._read(offset, size)
-        def _unpack_share_hashes(data):
-            assert len(data) == size
-            hashes = []
-            for i in range(0, size, 2+HASH_SIZE):
-                hashnum = struct.unpack(">H", data[i:i+2])[0]
-                hashvalue = data[i+2:i+2+HASH_SIZE]
-                hashes.append( (hashnum, hashvalue) )
-            return hashes
-        d.addCallback(_unpack_share_hashes)
+        d = self._start_if_needed()
+        d.addCallback(self._get_share_hashes)
         return d
 
+    def _get_uri_extension(self, unused=None):
+        return self._ueb_data
+
     def get_uri_extension(self):
-        offset = self._offsets['uri_extension']
-        d = self._read(offset, self._fieldsize)
-        def _got_length(data):
-            if len(data) != self._fieldsize:
-                raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
-            length = struct.unpack(self._fieldstruct, data)[0]
-            if length >= 2**31:
-                # URI extension blocks are around 419 bytes long, so this must be corrupted.
-                # Anyway, the foolscap interface schema for "read" will not allow >= 2**31 bytes
-                # length.
-                raise RidiculouslyLargeURIExtensionBlock(length)
-
-            return self._read(offset+self._fieldsize, length)
-        d.addCallback(_got_length)
+        d = self._start_if_needed()
+        d.addCallback(self._get_uri_extension)
         return d
 
     def _read(self, offset, length):
index 191004629f4f59741c95752756e9d0e1291f1929..a176e01c8dea4d7fbcdfad5e5bda40e250a0e16a 100644 (file)
@@ -306,7 +306,7 @@ class IStorageBucketWriter(Interface):
 
 class IStorageBucketReader(Interface):
 
-    def get_block(blocknum=int):
+    def get_block_data(blocknum=int, blocksize=int, size=int):
         """Most blocks will be the same size. The last block might be shorter
         than the others.
 
@@ -318,12 +318,12 @@ class IStorageBucketReader(Interface):
         @return: ListOf(Hash)
         """
 
-    def get_block_hashes():
+    def get_block_hashes(at_least_these=SetOf(int)):
         """
         @return: ListOf(Hash)
         """
 
-    def get_share_hashes():
+    def get_share_hashes(at_least_these=SetOf(int)):
         """
         @return: ListOf(TupleOf(int, Hash))
         """
index 048203d7e94c23670f00767eb53e8772659c61d5..10097e3aa30a5ac40d84228773a599ecde1c8b48 100644 (file)
@@ -88,8 +88,7 @@ class CHKCheckerAndUEBFetcher:
             return
         b,peerid = self._readers.pop()
         rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
-        d = rbp.startIfNecessary()
-        d.addCallback(lambda res: rbp.get_uri_extension())
+        d = rbp.get_uri_extension()
         d.addCallback(self._got_uri_extension)
         d.addErrback(self._ueb_error)
         return d
index 73a0d02867e0f7cebabc7461f7edcb864a9b7b58..b05cfd99a0aa4bffd136cff974935739767e814a 100644 (file)
@@ -105,7 +105,8 @@ def dump_immutable_share(options):
         print >>out, "%20s: %s" % ("verify-cap", verify_cap)
 
     sizes = {}
-    sizes['data'] = bp._data_size
+    sizes['data'] = (offsets['plaintext_hash_tree'] -
+                           offsets['data'])
     sizes['validation'] = (offsets['uri_extension'] -
                            offsets['plaintext_hash_tree'])
     sizes['uri-extension'] = len(UEB_data)
@@ -586,6 +587,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
         class ImmediateReadBucketProxy(ReadBucketProxy):
             def __init__(self, sf):
                 self.sf = sf
+                ReadBucketProxy.__init__(self, "", "", "")
             def __repr__(self):
                 return "<ImmediateReadBucketProxy>"
             def _read(self, offset, size):
@@ -594,7 +596,6 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
         # use a ReadBucketProxy to parse the bucket and find the uri extension
         sf = storage.ShareFile(abs_sharefile)
         bp = ImmediateReadBucketProxy(sf)
-        call(bp.start)
 
         expiration_time = min( [lease.expiration_time
                                 for lease in sf.iter_leases()] )
index 57822040d618854f854f83484be717a5134d356a..d3e4ca934b80c91d1179747824f83b21fc0f3590 100644 (file)
@@ -36,14 +36,15 @@ class FakeBucketReaderWriterProxy:
     def get_peerid(self):
         return "peerid"
 
-    def startIfNecessary(self):
-        return defer.succeed(self)
-    def start(self):
+    def _start(self):
         if self.mode == "lost-early":
             f = Failure(LostPeerError("I went away early"))
             return eventual.fireEventually(f)
         return defer.succeed(self)
 
+    def put_header(self):
+        return self._start()
+
     def put_block(self, segmentnum, data):
         if self.mode == "lost-early":
             f = Failure(LostPeerError("I went away early"))
@@ -99,41 +100,50 @@ class FakeBucketReaderWriterProxy:
     def abort(self):
         return defer.succeed(None)
 
-    def get_block(self, blocknum):
-        def _try():
+    def get_block_data(self, blocknum, blocksize, size):
+        d = self._start()
+        def _try(unused=None):
             assert isinstance(blocknum, (int, long))
             if self.mode == "bad block":
                 return flip_bit(self.blocks[blocknum])
             return self.blocks[blocknum]
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
     def get_plaintext_hashes(self):
-        def _try():
+        d = self._start()
+        def _try(unused=None):
             hashes = self.plaintext_hashes[:]
             return hashes
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
     def get_crypttext_hashes(self):
-        def _try():
+        d = self._start()
+        def _try(unused=None):
             hashes = self.crypttext_hashes[:]
             if self.mode == "bad crypttext hashroot":
                 hashes[0] = flip_bit(hashes[0])
             if self.mode == "bad crypttext hash":
                 hashes[1] = flip_bit(hashes[1])
             return hashes
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
-    def get_block_hashes(self):
-        def _try():
+    def get_block_hashes(self, at_least_these=()):
+        d = self._start()
+        def _try(unused=None):
             if self.mode == "bad blockhash":
                 hashes = self.block_hashes[:]
                 hashes[1] = flip_bit(hashes[1])
                 return hashes
             return self.block_hashes
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
-    def get_share_hashes(self):
-        def _try():
+    def get_share_hashes(self, at_least_these=()):
+        d = self._start()
+        def _try(unused=None):
             if self.mode == "bad sharehash":
                 hashes = self.share_hashes[:]
                 hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
@@ -144,14 +154,17 @@ class FakeBucketReaderWriterProxy:
                 # download.py is supposed to guard against this case.
                 return []
             return self.share_hashes
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
     def get_uri_extension(self):
-        def _try():
+        d = self._start()
+        def _try(unused=None):
             if self.mode == "bad uri_extension":
                 return flip_bit(self.uri_extension)
             return self.uri_extension
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
 
 def make_data(length):
@@ -719,9 +732,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
         # the first 7 servers have bad block hashes, so the sharehash tree
         # will not validate, and the download will fail
         modemap = dict([(i, "bad sharehash")
-                        for i in range(7)]
-                       + [(i, "good")
-                          for i in range(7, 10)])
+                        for i in range(10)])
         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
         def _done(res):
             self.failUnless(isinstance(res, Failure))
@@ -739,12 +750,10 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
         return self.send_and_recover((4,8,10), bucket_modes=modemap)
 
     def test_missing_sharehashes_failure(self):
-        # the first 7 servers are missing their sharehashes, so the
-        # sharehash tree will not validate, and the download will fail
+        # all servers are missing their sharehashes, so the sharehash tree will not validate,
+        # and the download will fail
         modemap = dict([(i, "missing sharehash")
-                        for i in range(7)]
-                       + [(i, "good")
-                          for i in range(7, 10)])
+                        for i in range(10)])
         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
         def _done(res):
             self.failUnless(isinstance(res, Failure), res)
index f5bbb49ba05849325cfdd9e8bf5e62b8baca69cc..5388c6e97744af05075f717c9750c91a6a746e10 100644 (file)
@@ -62,6 +62,24 @@ class Incomplete(unittest.TestCase):
         self.failUnlessRaises(IndexError, ht.get_leaf, 8)
         self.failUnlessEqual(ht.get_leaf_index(0), 7)
 
+    def test_needed_hashes(self):
+        ht = hashtree.IncompleteHashTree(8)
+        self.failUnlessEqual(ht.needed_hashes(0), set([8, 4, 2]))
+        self.failUnlessEqual(ht.needed_hashes(0, True), set([7, 8, 4, 2]))
+        self.failUnlessEqual(ht.needed_hashes(1), set([7, 4, 2]))
+        self.failUnlessEqual(ht.needed_hashes(7), set([13, 5, 1]))
+        self.failUnlessEqual(ht.needed_hashes(7, False), set([13, 5, 1]))
+        self.failUnlessEqual(ht.needed_hashes(7, True), set([14, 13, 5, 1]))
+        ht = hashtree.IncompleteHashTree(1)
+        self.failUnlessEqual(ht.needed_hashes(0), set([]))
+        ht = hashtree.IncompleteHashTree(6)
+        self.failUnlessEqual(ht.needed_hashes(0), set([8, 4, 2]))
+        self.failUnlessEqual(ht.needed_hashes(0, True), set([7, 8, 4, 2]))
+        self.failUnlessEqual(ht.needed_hashes(1), set([7, 4, 2]))
+        self.failUnlessEqual(ht.needed_hashes(5), set([11, 6, 1]))
+        self.failUnlessEqual(ht.needed_hashes(5, False), set([11, 6, 1]))
+        self.failUnlessEqual(ht.needed_hashes(5, True), set([12, 11, 6, 1]))
+
     def test_check(self):
         # first create a complete hash tree
         ht = make_tree(6)
@@ -160,4 +178,3 @@ class Incomplete(unittest.TestCase):
             iht.set_hashes(chain, leaves={4: tagged_hash("tag", "4")})
         except hashtree.BadHashError, e:
             self.fail("bad hash: %s" % e)
-
index 8f8984a4d2a05b8f3061dd8e6d73c5aaf0edc824..e9bdf0dc60f79b5f5994f244d279b1e978cc0215 100644 (file)
@@ -380,9 +380,10 @@ class Test(ShareManglingMixin, unittest.TestCase):
         before_download_reads = self._count_reads()
         def _after_download(unused=None):
             after_download_reads = self._count_reads()
-            # To pass this test, you have to download the file using only 10 reads to get the
-            # UEB (in parallel from all shares), plus one read for each of the 3 shares.
-            self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
+            # To pass this test, you have to download the file using only 10 reads total: 3 to
+            # get the headers from each share, 3 to get the share hash trees and uebs from each
+            # share, 1 to get the crypttext hashes, and 3 to get the block data from each share.
+            self.failIf(after_download_reads-before_download_reads > 10, (after_download_reads, before_download_reads))
         d.addCallback(self._download_and_check_plaintext)
         d.addCallback(_after_download)
         return d
@@ -403,7 +404,6 @@ class Test(ShareManglingMixin, unittest.TestCase):
         d.addCallback(self._download_and_check_plaintext)
         d.addCallback(_after_download)
         return d
-    test_download_from_only_3_remaining_shares.todo = "I think this test is failing due to the downloader code not knowing how to handle URI corruption and keeping going.  I'm going to commit new downloader code soon, and then see if this test starts passing."
 
     def test_download_abort_if_too_many_missing_shares(self):
         """ Test that download gives up quickly when it realizes there aren't enough shares out
index 3548c5cbde6731ea23bfc479f5a485ead445db15..1b0d3ae31f8d5dbea36a06906b863878c5e44aa4 100644 (file)
@@ -125,10 +125,10 @@ class BucketProxy(unittest.TestCase):
         bw, rb, sharefname = self.make_bucket("test_create", 500)
         bp = WriteBucketProxy(rb,
                               data_size=300,
-                              segment_size=10,
+                              block_size=10,
                               num_segments=5,
                               num_share_hashes=3,
-                              uri_extension_size=500, nodeid=None)
+                              uri_extension_size_max=500, nodeid=None)
         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
 
     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
@@ -156,13 +156,13 @@ class BucketProxy(unittest.TestCase):
         bw, rb, sharefname = self.make_bucket(name, sharesize)
         bp = wbp_class(rb,
                        data_size=95,
-                       segment_size=25,
+                       block_size=25,
                        num_segments=4,
                        num_share_hashes=3,
-                       uri_extension_size=len(uri_extension),
+                       uri_extension_size_max=len(uri_extension),
                        nodeid=None)
 
-        d = bp.start()
+        d = bp.put_header()
         d.addCallback(lambda res: bp.put_block(0, "a"*25))
         d.addCallback(lambda res: bp.put_block(1, "b"*25))
         d.addCallback(lambda res: bp.put_block(2, "c"*25))
@@ -182,21 +182,19 @@ class BucketProxy(unittest.TestCase):
             self.failUnless("to peer" in repr(rbp))
             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
 
-            d1 = rbp.startIfNecessary()
-            d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
-            d1.addCallback(lambda res: rbp.get_block(0))
+            d1 = rbp.get_block_data(0, 25, 25)
             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
-            d1.addCallback(lambda res: rbp.get_block(1))
+            d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
-            d1.addCallback(lambda res: rbp.get_block(2))
+            d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
-            d1.addCallback(lambda res: rbp.get_block(3))
+            d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
 
             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
             d1.addCallback(lambda res:
                            self.failUnlessEqual(res, crypttext_hashes))
-            d1.addCallback(lambda res: rbp.get_block_hashes())
+            d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
             d1.addCallback(lambda res: rbp.get_share_hashes())
             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))