The code for validating the share hash tree and the block hash tree has been rewritten to make sure it handles all cases, to share metadata about the file (such as the share hash tree, block hash trees, and UEB) among different share downloads, and not to require hashes to be stored on the server unnecessarily, such as the roots of the block hash trees (not needed since they are also the leaves of the share hash tree), and the root of the share hash tree (not needed since it is also included in the UEB). It also passes the latest tests including handling corrupted shares well.
ValidatedReadBucketProxy takes a share_hash_tree argument to its constructor, which is a reference to a share hash tree shared by all ValidatedReadBucketProxies for that immutable file download.
ValidatedReadBucketProxy requires the block_size and share_size to be provided in its constructor, and it then uses those to compute the offsets and lengths of blocks when it needs them, instead of reading those values out of the share. The user of ValidatedReadBucketProxy therefore has to have first used a ValidatedExtendedURIProxy to compute those two values from the validated contents of the URI. This is pleasingly simplifies safety analysis: the client knows which span of bytes corresponds to a given block from the validated URI data, rather than from the unvalidated data stored on the storage server. It also simplifies unit testing of verifier/repairer, because now it doesn't care about the contents of the "share size" and "block size" fields in the share. It does not relieve the need for share data v2 layout, because we still need to store and retrieve the offsets of the fields which come after the share data, therefore we still need to use share data v2 with its 8-byte fields if we want to store share data larger than about 2^32.
Specify which subset of the block hashes and share hashes you need while downloading a particular share. In the future this will hopefully be used to fetch only a subset, for network efficiency, but currently all of them are fetched, regardless of which subset you specify.
ReadBucketProxy hides the question of whether it has "started" or not (sent a request to the server to get metadata) from its user.
Download is optimized to do as few roundtrips and as few requests as possible, hopefully speeding up download a bit.
from foolscap import DeadReferenceError
from foolscap.eventual import eventually
-from allmydata.util import base32, mathutil, hashutil, log
+from allmydata.util import base32, deferredutil, mathutil, hashutil, log
from allmydata.util.assertutil import _assert, precondition
from allmydata.util.rrefutil import ServerFailure
from allmydata import codec, hashtree, uri
if not self._validatedthingproxies:
raise NotEnoughSharesError("ran out of peers, last error was %s" % (f,))
# try again with a different one
- return self._try_the_next_one()
+ d = self._try_the_next_one()
+ return d
def _try_the_next_one(self):
vtp = self._validatedthingproxies.pop(0)
return self
def start(self):
- d = self._readbucketproxy.startIfNecessary()
- d.addCallback(lambda ignored: self._readbucketproxy.get_crypttext_hashes())
+ d = self._readbucketproxy.get_crypttext_hashes()
d.addCallback(self._validate)
return d
""" Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
it. Returns a deferred which is called back with self once the fetch is successful, or
is erred back if it fails. """
- d = self._readbucketproxy.startIfNecessary()
- d.addCallback(lambda ignored: self._readbucketproxy.get_uri_extension())
+ d = self._readbucketproxy.get_uri_extension()
d.addCallback(self._check_integrity)
d.addCallback(self._parse_and_validate)
return d
class ValidatedReadBucketProxy(log.PrefixingLogMixin):
- """I am a front-end for a remote storage bucket, responsible for
- retrieving and validating data from that bucket.
+ """I am a front-end for a remote storage bucket, responsible for retrieving and validating
+ data from that bucket.
My get_block() method is used by BlockDownloaders.
"""
- def __init__(self, sharenum, bucket,
- share_hash_tree, share_root_hash,
- num_blocks):
- """ share_root_hash is the root of the share hash tree; share_root_hash is stored in the UEB """
+ def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
+ """ share_hash_tree is required to have already been initialized with the root hash
+ (the number-0 hash), using the share_root_hash from the UEB """
+ precondition(share_hash_tree[0] is not None, share_hash_tree)
prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
self.sharenum = sharenum
self.bucket = bucket
- self._share_hash = None # None means not validated yet
self.share_hash_tree = share_hash_tree
- self._share_root_hash = share_root_hash
- self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks)
- self.started = False
+ self.num_blocks = num_blocks
+ self.block_size = block_size
+ self.share_size = share_size
+ self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
def get_block(self, blocknum):
- if not self.started:
- d = self.bucket.start()
- def _started(res):
- self.started = True
- return self.get_block(blocknum)
- d.addCallback(_started)
- return d
-
# the first time we use this bucket, we need to fetch enough elements
# of the share hash tree to validate it from our share hash up to the
# hashroot.
- if not self._share_hash:
+ if self.share_hash_tree.needed_hashes(self.sharenum):
d1 = self.bucket.get_share_hashes()
else:
d1 = defer.succeed([])
- # we might need to grab some elements of our block hash tree, to
- # validate the requested block up to the share hash
- needed = self.block_hash_tree.needed_hashes(blocknum)
- if needed:
- # TODO: get fewer hashes, use get_block_hashes(needed)
- d2 = self.bucket.get_block_hashes()
- else:
- d2 = defer.succeed([])
-
- d3 = self.bucket.get_block(blocknum)
+ # We might need to grab some elements of our block hash tree, to
+ # validate the requested block up to the share hash.
+ blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
+ # We don't need the root of the block hash tree, as that comes in the share tree.
+ blockhashesneeded.discard(0)
+ d2 = self.bucket.get_block_hashes(blockhashesneeded)
- d = defer.gatherResults([d1, d2, d3])
- d.addCallback(self._got_data, blocknum)
- return d
+ if blocknum < self.num_blocks-1:
+ thisblocksize = self.block_size
+ else:
+ thisblocksize = self.share_size % self.block_size
+ if thisblocksize == 0:
+ thisblocksize = self.block_size
+ d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
+
+ dl = deferredutil.gatherResults([d1, d2, d3])
+ dl.addCallback(self._got_data, blocknum)
+ return dl
+
+ def _got_data(self, results, blocknum):
+ precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
+ sharehashes, blockhashes, blockdata = results
+ try:
+ sharehashes = dict(sharehashes)
+ except ValueError, le:
+ le.args = tuple(le.args + (sharehashes,))
+ raise
+ blockhashes = dict(enumerate(blockhashes))
- def _got_data(self, res, blocknum):
- sharehashes, blockhashes, blockdata = res
- blockhash = None # to make logging it safe
+ candidate_share_hash = None # in case we log it in the except block below
+ blockhash = None # in case we log it in the except block below
try:
- if not self._share_hash:
- sh = dict(sharehashes)
- sh[0] = self._share_root_hash # always use our own root, from the URI
- sht = self.share_hash_tree
- if sht.get_leaf_index(self.sharenum) not in sh:
- raise hashtree.NotEnoughHashesError
- sht.set_hashes(sh)
- self._share_hash = sht.get_leaf(self.sharenum)
+ if self.share_hash_tree.needed_hashes(self.sharenum):
+ # This will raise exception if the values being passed do not match the root
+ # node of self.share_hash_tree.
+ self.share_hash_tree.set_hashes(sharehashes)
+
+ # To validate a block we need the root of the block hash tree, which is also one of
+ # the leafs of the share hash tree, and is called "the share hash".
+ if not self.block_hash_tree[0]: # empty -- no root node yet
+ # Get the share hash from the share hash tree.
+ share_hash = self.share_hash_tree.get_leaf(self.sharenum)
+ if not share_hash:
+ raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
+ self.block_hash_tree.set_hashes({0: share_hash})
+
+ if self.block_hash_tree.needed_hashes(blocknum):
+ self.block_hash_tree.set_hashes(blockhashes)
blockhash = hashutil.block_hash(blockdata)
+ self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
#self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
# "%r .. %r: %s" %
# (self.sharenum, blocknum, len(blockdata),
# blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
- # we always validate the blockhash
- bh = dict(enumerate(blockhashes))
- # replace blockhash root with validated value
- bh[0] = self._share_hash
- self.block_hash_tree.set_hashes(bh, {blocknum: blockhash})
-
except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
# log.WEIRD: indicates undetected disk/network error, or more
# likely a programming error
self.log("hash failure in block=%d, shnum=%d on %s" %
(blocknum, self.sharenum, self.bucket))
- if self._share_hash:
+ if self.block_hash_tree.needed_hashes(blocknum):
self.log(""" failure occurred when checking the block_hash_tree.
This suggests that either the block data was bad, or that the
block hashes we received along with it were bad.""")
self.log(""" the failure probably occurred when checking the
share_hash_tree, which suggests that the share hashes we
received from the remote peer were bad.""")
- self.log(" have self._share_hash: %s" % bool(self._share_hash))
+ self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
self.log(" block length: %d" % len(blockdata))
self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
if len(blockdata) < 100:
else:
self.log(" block data start/end: %r .. %r" %
(blockdata[:50], blockdata[-50:]))
- self.log(" root hash: %s" % base32.b2a(self._share_root_hash))
self.log(" share hash tree:\n" + self.share_hash_tree.dump())
self.log(" block hash tree:\n" + self.block_hash_tree.dump())
lines = []
- for i,h in sorted(sharehashes):
+ for i,h in sorted(sharehashes.items()):
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
lines = []
- for i,h in enumerate(blockhashes):
+ for i,h in blockhashes.items():
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
raise BadOrMissingHash(le)
self.parent.hold_block(self.blocknum, data)
def _got_block_error(self, f):
- failtype = f.trap(ServerFailure, IntegrityCheckReject, layout.LayoutInvalid)
+ failtype = f.trap(ServerFailure, IntegrityCheckReject, layout.LayoutInvalid, layout.ShareVersionIncompatible)
if f.check(ServerFailure):
level = log.UNUSUAL
else:
self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
+ self._share_hash_tree = None
self._crypttext_hash_tree = None
def pauseProducing(self):
self._current_segnum = 0
- self._share_hashtree = hashtree.IncompleteHashTree(self._uri.total_shares)
- self._share_hashtree.set_hashes({0: vup.share_root_hash})
+ self._share_hash_tree = hashtree.IncompleteHashTree(self._uri.total_shares)
+ self._share_hash_tree.set_hashes({0: vup.share_root_hash})
self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
def _download_all_segments(self, res):
for sharenum, bucket in self._share_buckets:
- vbucket = ValidatedReadBucketProxy(sharenum, bucket,
- self._share_hashtree,
- self._vup.share_root_hash,
- self._vup.num_segments)
- s = self._share_vbuckets.setdefault(sharenum, set())
- s.add(vbucket)
+ vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
+ self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
# after the above code, self._share_vbuckets contains enough
# buckets to complete the download, and some extra ones to
self.set_status("Starting shareholders")
dl = []
for shareid in self.landlords:
- d = self.landlords[shareid].start()
+ d = self.landlords[shareid].put_header()
d.addErrback(self._remove_shareholder, shareid, "start")
dl.append(d)
return self._gather_responses(dl)
from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE
-from allmydata.util import mathutil, idlib
+from allmydata.util import log, mathutil, idlib, observer
from allmydata.util.assertutil import precondition
from allmydata import storage
the beginning of the share data.
0x00: version number (=00 00 00 01)
-0x04: segment size
-0x08: data size
+0x04: block size # See Footnote 1 below.
+0x08: share data size # See Footnote 1 below.
0x0c: offset of data (=00 00 00 24)
0x10: offset of plaintext_hash_tree UNUSED
0x14: offset of crypttext_hash_tree
? : start of block_hashes
? : start of share_hashes
each share_hash is written as a two-byte (big-endian) hashnum
- followed by the 32-byte SHA-256 hash. We only store the hashes
+ followed by the 32-byte SHA-256 hash. We store only the hashes
necessary to validate the share hash root
? : start of uri_extension_length (four-byte big-endian value)
? : start of uri_extension
limitations described in #346.
0x00: version number (=00 00 00 02)
-0x04: segment size
-0x0c: data size
+0x04: block size # See Footnote 1 below.
+0x0c: share data size # See Footnote 1 below.
0x14: offset of data (=00 00 00 00 00 00 00 44)
0x1c: offset of plaintext_hash_tree UNUSED
0x24: offset of crypttext_hash_tree
? : start of uri_extension_length (eight-byte big-endian value)
"""
+# Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but they are still
+# provided when writing so that older versions of Tahoe can read them.
+
def allocated_size(data_size, num_segments, num_share_hashes,
- uri_extension_size):
+ uri_extension_size_max):
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
- uri_extension_size, None)
+ uri_extension_size_max, None)
uri_extension_starts_at = wbp._offsets['uri_extension']
- return uri_extension_starts_at + wbp.fieldsize + uri_extension_size
+ return uri_extension_starts_at + wbp.fieldsize + uri_extension_size_max
class WriteBucketProxy:
implements(IStorageBucketWriter)
fieldsize = 4
fieldstruct = ">L"
- def __init__(self, rref, data_size, segment_size, num_segments,
- num_share_hashes, uri_extension_size, nodeid):
+ def __init__(self, rref, data_size, block_size, num_segments,
+ num_share_hashes, uri_extension_size_max, nodeid):
self._rref = rref
self._data_size = data_size
- self._segment_size = segment_size
+ self._block_size = block_size
self._num_segments = num_segments
self._nodeid = nodeid
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
- self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
+ self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
# we commit to not sending a uri extension larger than this
- self._uri_extension_size = uri_extension_size
+ self._uri_extension_size_max = uri_extension_size_max
- self._create_offsets(segment_size, data_size)
+ self._create_offsets(block_size, data_size)
- def _create_offsets(self, segment_size, data_size):
- if segment_size >= 2**32 or data_size >= 2**32:
+ def _create_offsets(self, block_size, data_size):
+ if block_size >= 2**32 or data_size >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
offsets = self._offsets = {}
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
- x += self._share_hash_size
+ x += self._share_hashtree_size
offsets['uri_extension'] = x
if x >= 2**32:
offset_data = struct.pack(">LLLLLLLLL",
1, # version number
- segment_size,
+ block_size,
data_size,
offsets['data'],
offsets['plaintext_hash_tree'], # UNUSED
nodeid_s = "[None]"
return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
- def start(self):
+ def put_header(self):
return self._write(0, self._offset_data)
def put_block(self, segmentnum, data):
- offset = self._offsets['data'] + segmentnum * self._segment_size
+ offset = self._offsets['data'] + segmentnum * self._block_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
if segmentnum < self._num_segments-1:
- precondition(len(data) == self._segment_size,
- len(data), self._segment_size)
+ precondition(len(data) == self._block_size,
+ len(data), self._block_size)
else:
precondition(len(data) == (self._data_size -
- (self._segment_size *
+ (self._block_size *
(self._num_segments - 1))),
- len(data), self._segment_size)
+ len(data), self._block_size)
return self._write(offset, data)
def put_crypttext_hashes(self, hashes):
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
- precondition(len(data) == self._share_hash_size,
- len(data), self._share_hash_size)
+ precondition(len(data) == self._share_hashtree_size,
+ len(data), self._share_hashtree_size)
precondition(offset + len(data) <= self._offsets['uri_extension'],
offset, len(data), offset+len(data),
self._offsets['uri_extension'])
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, str)
- precondition(len(data) <= self._uri_extension_size,
- len(data), self._uri_extension_size)
+ precondition(len(data) <= self._uri_extension_size_max,
+ len(data), self._uri_extension_size_max)
length = struct.pack(self.fieldstruct, len(data))
return self._write(offset, length+data)
fieldsize = 8
fieldstruct = ">Q"
- def _create_offsets(self, segment_size, data_size):
- if segment_size >= 2**64 or data_size >= 2**64:
+ def _create_offsets(self, block_size, data_size):
+ if block_size >= 2**64 or data_size >= 2**64:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
offsets = self._offsets = {}
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
- x += self._share_hash_size
+ x += self._share_hashtree_size
offsets['uri_extension'] = x
if x >= 2**64:
offset_data = struct.pack(">LQQQQQQQQ",
2, # version number
- segment_size,
+ block_size,
data_size,
offsets['data'],
offsets['plaintext_hash_tree'], # UNUSED
class ReadBucketProxy:
implements(IStorageBucketReader)
+
+ MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
+
def __init__(self, rref, peerid, storage_index):
self._rref = rref
self._peerid = peerid
peer_id_s = idlib.shortnodeid_b2a(peerid)
storage_index_s = storage.si_b2a(storage_index)
- self._reprstr = "<ReadBucketProxy to peer [%s] SI %s>" % (peer_id_s, storage_index_s)
- self._started = False
+ self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
+ self._started = False # sent request to server
+ self._ready = observer.OneShotObserverList() # got response from server
def get_peerid(self):
return self._peerid
def __repr__(self):
return self._reprstr
- def startIfNecessary(self):
- if self._started:
- return defer.succeed(self)
- d = self.start()
- d.addCallback(lambda res: self)
- return d
-
- def start(self):
- # TODO: for small shares, read the whole bucket in start()
- d = self._read(0, 0x44)
+ def _start_if_needed(self):
+ """ Returns a deferred that will be fired when I'm ready to return data, or errbacks if
+ the starting (header reading and parsing) process fails."""
+ if not self._started:
+ self._start()
+ return self._ready.when_fired()
+
+ def _start(self):
+ self._started = True
+ # TODO: for small shares, read the whole bucket in _start()
+ d = self._fetch_header()
d.addCallback(self._parse_offsets)
- def _started(res):
- self._started = True
- return res
- d.addCallback(_started)
+ d.addCallback(self._fetch_sharehashtree_and_ueb)
+ d.addCallback(self._parse_sharehashtree_and_ueb)
+ def _fail_waiters(f):
+ self._ready.fire(f)
+ d.addErrback(_fail_waiters)
return d
+ def _fetch_header(self):
+ return self._read(0, 0x44)
+
def _parse_offsets(self, data):
precondition(len(data) >= 0x4)
self._offsets = {}
x = 0x0c
fieldsize = 0x4
fieldstruct = ">L"
- (self._segment_size,
- self._data_size) = struct.unpack(">LL", data[0x4:0xc])
else:
precondition(len(data) >= 0x44)
x = 0x14
fieldsize = 0x8
fieldstruct = ">Q"
- (self._segment_size,
- self._data_size) = struct.unpack(">QQ", data[0x4:0x14])
self._version = version
self._fieldsize = fieldsize
self._offsets[field] = offset
return self._offsets
- def get_block(self, blocknum):
- num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
- if blocknum < num_segments-1:
- size = self._segment_size
- else:
- size = self._data_size % self._segment_size
- if size == 0:
- size = self._segment_size
- offset = self._offsets['data'] + blocknum * self._segment_size
- return self._read(offset, size)
+ def _fetch_sharehashtree_and_ueb(self, offsets):
+ sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
+ return self._read(offsets['share_hashes'], self.MAX_UEB_SIZE+sharehashtree_size)
+
+ def _parse_sharehashtree_and_ueb(self, data):
+ sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
+ if len(data) < sharehashtree_size:
+ raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
+ if sharehashtree_size % (2+HASH_SIZE) != 0:
+ raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
+ self._share_hashes = []
+ for i in range(0, sharehashtree_size, 2+HASH_SIZE):
+ hashnum = struct.unpack(">H", data[i:i+2])[0]
+ hashvalue = data[i+2:i+2+HASH_SIZE]
+ self._share_hashes.append( (hashnum, hashvalue) )
+
+ i = self._offsets['uri_extension']-self._offsets['share_hashes']
+ if len(data) < i+self._fieldsize:
+ raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
+ length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
+ self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
+
+ self._ready.fire(self)
+
+ def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
+ offset = self._offsets['data'] + blocknum * blocksize
+ return self._read(offset, thisblocksize)
+
+ def get_block_data(self, blocknum, blocksize, thisblocksize):
+ d = self._start_if_needed()
+ d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize)
+ return d
def _str2l(self, s):
""" split string (pulled from storage) into a list of blockids """
return [ s[i:i+HASH_SIZE]
for i in range(0, len(s), HASH_SIZE) ]
- def get_crypttext_hashes(self):
+ def _get_crypttext_hashes(self, unused=None):
offset = self._offsets['crypttext_hash_tree']
size = self._offsets['block_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
- def get_block_hashes(self):
+ def get_crypttext_hashes(self):
+ d = self._start_if_needed()
+ d.addCallback(self._get_crypttext_hashes)
+ return d
+
+ def _get_block_hashes(self, unused=None, at_least_these=()):
+ # TODO: fetch only at_least_these instead of all of them.
offset = self._offsets['block_hashes']
size = self._offsets['share_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
+ def get_block_hashes(self, at_least_these=()):
+ if at_least_these:
+ d = self._start_if_needed()
+ d.addCallback(self._get_block_hashes, at_least_these)
+ return d
+ else:
+ return defer.succeed([])
+
+ def _get_share_hashes(self, unused=None):
+ return self._share_hashes
+
def get_share_hashes(self):
- offset = self._offsets['share_hashes']
- size = self._offsets['uri_extension'] - offset
- assert size % (2+HASH_SIZE) == 0
- d = self._read(offset, size)
- def _unpack_share_hashes(data):
- assert len(data) == size
- hashes = []
- for i in range(0, size, 2+HASH_SIZE):
- hashnum = struct.unpack(">H", data[i:i+2])[0]
- hashvalue = data[i+2:i+2+HASH_SIZE]
- hashes.append( (hashnum, hashvalue) )
- return hashes
- d.addCallback(_unpack_share_hashes)
+ d = self._start_if_needed()
+ d.addCallback(self._get_share_hashes)
return d
+ def _get_uri_extension(self, unused=None):
+ return self._ueb_data
+
def get_uri_extension(self):
- offset = self._offsets['uri_extension']
- d = self._read(offset, self._fieldsize)
- def _got_length(data):
- if len(data) != self._fieldsize:
- raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
- length = struct.unpack(self._fieldstruct, data)[0]
- if length >= 2**31:
- # URI extension blocks are around 419 bytes long, so this must be corrupted.
- # Anyway, the foolscap interface schema for "read" will not allow >= 2**31 bytes
- # length.
- raise RidiculouslyLargeURIExtensionBlock(length)
-
- return self._read(offset+self._fieldsize, length)
- d.addCallback(_got_length)
+ d = self._start_if_needed()
+ d.addCallback(self._get_uri_extension)
return d
def _read(self, offset, length):
class IStorageBucketReader(Interface):
- def get_block(blocknum=int):
+ def get_block_data(blocknum=int, blocksize=int, size=int):
"""Most blocks will be the same size. The last block might be shorter
than the others.
@return: ListOf(Hash)
"""
- def get_block_hashes():
+ def get_block_hashes(at_least_these=SetOf(int)):
"""
@return: ListOf(Hash)
"""
- def get_share_hashes():
+ def get_share_hashes(at_least_these=SetOf(int)):
"""
@return: ListOf(TupleOf(int, Hash))
"""
return
b,peerid = self._readers.pop()
rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
- d = rbp.startIfNecessary()
- d.addCallback(lambda res: rbp.get_uri_extension())
+ d = rbp.get_uri_extension()
d.addCallback(self._got_uri_extension)
d.addErrback(self._ueb_error)
return d
print >>out, "%20s: %s" % ("verify-cap", verify_cap)
sizes = {}
- sizes['data'] = bp._data_size
+ sizes['data'] = (offsets['plaintext_hash_tree'] -
+ offsets['data'])
sizes['validation'] = (offsets['uri_extension'] -
offsets['plaintext_hash_tree'])
sizes['uri-extension'] = len(UEB_data)
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
+ ReadBucketProxy.__init__(self, "", "", "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
# use a ReadBucketProxy to parse the bucket and find the uri extension
sf = storage.ShareFile(abs_sharefile)
bp = ImmediateReadBucketProxy(sf)
- call(bp.start)
expiration_time = min( [lease.expiration_time
for lease in sf.iter_leases()] )
def get_peerid(self):
return "peerid"
- def startIfNecessary(self):
- return defer.succeed(self)
- def start(self):
+ def _start(self):
if self.mode == "lost-early":
f = Failure(LostPeerError("I went away early"))
return eventual.fireEventually(f)
return defer.succeed(self)
+ def put_header(self):
+ return self._start()
+
def put_block(self, segmentnum, data):
if self.mode == "lost-early":
f = Failure(LostPeerError("I went away early"))
def abort(self):
return defer.succeed(None)
- def get_block(self, blocknum):
- def _try():
+ def get_block_data(self, blocknum, blocksize, size):
+ d = self._start()
+ def _try(unused=None):
assert isinstance(blocknum, (int, long))
if self.mode == "bad block":
return flip_bit(self.blocks[blocknum])
return self.blocks[blocknum]
- return defer.maybeDeferred(_try)
+ d.addCallback(_try)
+ return d
def get_plaintext_hashes(self):
- def _try():
+ d = self._start()
+ def _try(unused=None):
hashes = self.plaintext_hashes[:]
return hashes
- return defer.maybeDeferred(_try)
+ d.addCallback(_try)
+ return d
def get_crypttext_hashes(self):
- def _try():
+ d = self._start()
+ def _try(unused=None):
hashes = self.crypttext_hashes[:]
if self.mode == "bad crypttext hashroot":
hashes[0] = flip_bit(hashes[0])
if self.mode == "bad crypttext hash":
hashes[1] = flip_bit(hashes[1])
return hashes
- return defer.maybeDeferred(_try)
+ d.addCallback(_try)
+ return d
- def get_block_hashes(self):
- def _try():
+ def get_block_hashes(self, at_least_these=()):
+ d = self._start()
+ def _try(unused=None):
if self.mode == "bad blockhash":
hashes = self.block_hashes[:]
hashes[1] = flip_bit(hashes[1])
return hashes
return self.block_hashes
- return defer.maybeDeferred(_try)
+ d.addCallback(_try)
+ return d
- def get_share_hashes(self):
- def _try():
+ def get_share_hashes(self, at_least_these=()):
+ d = self._start()
+ def _try(unused=None):
if self.mode == "bad sharehash":
hashes = self.share_hashes[:]
hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
# download.py is supposed to guard against this case.
return []
return self.share_hashes
- return defer.maybeDeferred(_try)
+ d.addCallback(_try)
+ return d
def get_uri_extension(self):
- def _try():
+ d = self._start()
+ def _try(unused=None):
if self.mode == "bad uri_extension":
return flip_bit(self.uri_extension)
return self.uri_extension
- return defer.maybeDeferred(_try)
+ d.addCallback(_try)
+ return d
def make_data(length):
# the first 7 servers have bad block hashes, so the sharehash tree
# will not validate, and the download will fail
modemap = dict([(i, "bad sharehash")
- for i in range(7)]
- + [(i, "good")
- for i in range(7, 10)])
+ for i in range(10)])
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure))
return self.send_and_recover((4,8,10), bucket_modes=modemap)
def test_missing_sharehashes_failure(self):
- # the first 7 servers are missing their sharehashes, so the
- # sharehash tree will not validate, and the download will fail
+ # all servers are missing their sharehashes, so the sharehash tree will not validate,
+ # and the download will fail
modemap = dict([(i, "missing sharehash")
- for i in range(7)]
- + [(i, "good")
- for i in range(7, 10)])
+ for i in range(10)])
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure), res)
self.failUnlessRaises(IndexError, ht.get_leaf, 8)
self.failUnlessEqual(ht.get_leaf_index(0), 7)
+ def test_needed_hashes(self):
+ ht = hashtree.IncompleteHashTree(8)
+ self.failUnlessEqual(ht.needed_hashes(0), set([8, 4, 2]))
+ self.failUnlessEqual(ht.needed_hashes(0, True), set([7, 8, 4, 2]))
+ self.failUnlessEqual(ht.needed_hashes(1), set([7, 4, 2]))
+ self.failUnlessEqual(ht.needed_hashes(7), set([13, 5, 1]))
+ self.failUnlessEqual(ht.needed_hashes(7, False), set([13, 5, 1]))
+ self.failUnlessEqual(ht.needed_hashes(7, True), set([14, 13, 5, 1]))
+ ht = hashtree.IncompleteHashTree(1)
+ self.failUnlessEqual(ht.needed_hashes(0), set([]))
+ ht = hashtree.IncompleteHashTree(6)
+ self.failUnlessEqual(ht.needed_hashes(0), set([8, 4, 2]))
+ self.failUnlessEqual(ht.needed_hashes(0, True), set([7, 8, 4, 2]))
+ self.failUnlessEqual(ht.needed_hashes(1), set([7, 4, 2]))
+ self.failUnlessEqual(ht.needed_hashes(5), set([11, 6, 1]))
+ self.failUnlessEqual(ht.needed_hashes(5, False), set([11, 6, 1]))
+ self.failUnlessEqual(ht.needed_hashes(5, True), set([12, 11, 6, 1]))
+
def test_check(self):
# first create a complete hash tree
ht = make_tree(6)
iht.set_hashes(chain, leaves={4: tagged_hash("tag", "4")})
except hashtree.BadHashError, e:
self.fail("bad hash: %s" % e)
-
before_download_reads = self._count_reads()
def _after_download(unused=None):
after_download_reads = self._count_reads()
- # To pass this test, you have to download the file using only 10 reads to get the
- # UEB (in parallel from all shares), plus one read for each of the 3 shares.
- self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
+ # To pass this test, you have to download the file using only 10 reads total: 3 to
+ # get the headers from each share, 3 to get the share hash trees and uebs from each
+ # share, 1 to get the crypttext hashes, and 3 to get the block data from each share.
+ self.failIf(after_download_reads-before_download_reads > 10, (after_download_reads, before_download_reads))
d.addCallback(self._download_and_check_plaintext)
d.addCallback(_after_download)
return d
d.addCallback(self._download_and_check_plaintext)
d.addCallback(_after_download)
return d
- test_download_from_only_3_remaining_shares.todo = "I think this test is failing due to the downloader code not knowing how to handle URI corruption and keeping going. I'm going to commit new downloader code soon, and then see if this test starts passing."
def test_download_abort_if_too_many_missing_shares(self):
""" Test that download gives up quickly when it realizes there aren't enough shares out
bw, rb, sharefname = self.make_bucket("test_create", 500)
bp = WriteBucketProxy(rb,
data_size=300,
- segment_size=10,
+ block_size=10,
num_segments=5,
num_share_hashes=3,
- uri_extension_size=500, nodeid=None)
+ uri_extension_size_max=500, nodeid=None)
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
bw, rb, sharefname = self.make_bucket(name, sharesize)
bp = wbp_class(rb,
data_size=95,
- segment_size=25,
+ block_size=25,
num_segments=4,
num_share_hashes=3,
- uri_extension_size=len(uri_extension),
+ uri_extension_size_max=len(uri_extension),
nodeid=None)
- d = bp.start()
+ d = bp.put_header()
d.addCallback(lambda res: bp.put_block(0, "a"*25))
d.addCallback(lambda res: bp.put_block(1, "b"*25))
d.addCallback(lambda res: bp.put_block(2, "c"*25))
self.failUnless("to peer" in repr(rbp))
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
- d1 = rbp.startIfNecessary()
- d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
- d1.addCallback(lambda res: rbp.get_block(0))
+ d1 = rbp.get_block_data(0, 25, 25)
d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
- d1.addCallback(lambda res: rbp.get_block(1))
+ d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
- d1.addCallback(lambda res: rbp.get_block(2))
+ d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
- d1.addCallback(lambda res: rbp.get_block(3))
+ d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
d1.addCallback(lambda res: rbp.get_crypttext_hashes())
d1.addCallback(lambda res:
self.failUnlessEqual(res, crypttext_hashes))
- d1.addCallback(lambda res: rbp.get_block_hashes())
+ d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
d1.addCallback(lambda res: rbp.get_share_hashes())
d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))