Implement enough of chunk.IncompleteHashTree to be usable.
Rearrange download: all block/hash requests now go through
a ValidatedBucket instance, which is responsible for retrieving
and verifying hashes before providing validated data. Download
was changed to use ValidatedBuckets everywhere instead of
unwrapped RIBucketReader references.
or eat your children, but it might. Use at your own risk.
"""
+from allmydata.util import idlib
from allmydata.util.hashutil import tagged_hash, tagged_pair_hash
__version__ = '1.0.0-allmydata'
here = self.parent(here)
return needed
+ def depth_first(self, i=0):
+ yield i, 0
+ try:
+ for child,childdepth in self.depth_first(self.lchild(i)):
+ yield child, childdepth+1
+ except IndexError:
+ pass
+ try:
+ for child,childdepth in self.depth_first(self.rchild(i)):
+ yield child, childdepth+1
+ except IndexError:
+ pass
+
+ def dump(self):
+ lines = []
+ for i,depth in self.depth_first():
+ lines.append("%s%3d: %s" % (" "*depth, i, idlib.b2a_or_none(self[i])))
+ return "\n".join(lines) + "\n"
+
def empty_leaf_hash(i):
return tagged_hash('Merkle tree empty leaf', "%d" % i)
def pair_hash(a, b):
rows.reverse()
self[:] = sum(rows, [])
- def needed_hashes(self, leafnum):
- hashnum = self.first_leaf_num + leafnum
- maybe_needed = self.needed_for(hashnum)
- maybe_needed += [0] # need the root too
+ def needed_hashes(self, hashes=[], leaves=[]):
+ hashnums = set(list(hashes))
+ for leafnum in leaves:
+ hashnums.add(self.first_leaf_num + leafnum)
+ maybe_needed = set()
+ for hashnum in hashnums:
+ maybe_needed.update(self.needed_for(hashnum))
+ maybe_needed.add(0) # need the root too
return set([i for i in maybe_needed if self[i] is None])
- def set_hash(self, i, newhash):
- # note that we don't attempt to validate these
- self[i] = newhash
-
- def set_leaf(self, leafnum, leafhash):
- hashnum = self.first_leaf_num + leafnum
- needed = self.needed_hashes(leafnum)
- if needed:
- msg = "we need hashes " + ",".join([str(i) for i in needed])
- raise NotEnoughHashesError(msg)
- assert self[0] is not None # we can't validate without a root
- added = set() # we'll remove these if the check fails
- self[hashnum] = leafhash
- added.add(hashnum)
- # now propagate hash checks upwards until we reach the root
- here = hashnum
- while here != 0:
- us = [here, self.sibling(here)]
- us.sort()
- leftnum, rightnum = us
- lefthash = self[leftnum]
- righthash = self[rightnum]
- parent = self.parent(here)
- parenthash = self[parent]
-
- ourhash = pair_hash(lefthash, righthash)
- if parenthash is not None:
- if ourhash != parenthash:
- for i in added:
- self[i] = None
- raise BadHashError("h([%d]+[%d]) != h[%d]" % (leftnum, rightnum,
- parent))
- else:
- self[parent] = ourhash
- added.add(parent)
- here = self.parent(here)
+ def set_hashes(self, hashes={}, leaves={}, must_validate=False):
+ """Add a bunch of hashes to the tree.
+
+ I will validate these to the best of my ability. If I already have a copy
+ of any of the new hashes, the new values must equal the existing ones, or
+ I will raise BadHashError. If adding a hash allows me to compute a parent
+ hash, those parent hashes must match or I will raise BadHashError. If I
+ raise BadHashError, I will forget about all the hashes that you tried to
+ add, leaving my state exactly the same as before I was called. If I
+ return successfully, I will remember all those hashes.
+
+ If every hash that was added was validated, I will return True. If some
+ could not be validated because I did not have enough parent hashes, I
+ will return False. As a result, if I am called with both a leaf hash and
+ the root hash was already set, I will return True if and only if the leaf
+ hash could be validated against the root.
+
+ If must_validate is True, I will raise NotEnoughHashesError instead of
+ returning False. If I raise NotEnoughHashesError, I will forget about all
+ the hashes that you tried to add. TODO: really?
+
+ 'leaves' is a dictionary uses 'leaf index' values, which range from 0
+ (the left-most leaf) to num_leaves-1 (the right-most leaf), and form the
+ base of the tree. 'hashes' uses 'hash_index' values, which range from 0
+ (the root of the tree) to 2*num_leaves-2 (the right-most leaf). leaf[i]
+ is the same as hash[num_leaves-1+i].
+
+ The best way to use me is to obtain the root hash from some 'good'
+ channel, then call set_hash(0, root). Then use the 'bad' channel to
+ obtain data block 0 and the corresponding hash chain (a dict with the
+ same hashes that needed_hashes(0) tells you, e.g. {0:h0, 2:h2, 4:h4,
+ 8:h8} when len(L)=8). Hash the data block to create leaf0. Then call::
+
+ good = iht.set_hashes(hashes=hashchain, leaves={0: leaf0})
+
+ If 'good' is True, the data block was valid. If 'good' is False, the
+ hashchain did not have the right blocks and we don't know whether the
+ data block was good or bad. If set_hashes() raises an exception, either
+ the data was corrupted or one of the received hashes was corrupted.
+ """
+
+ assert isinstance(hashes, dict)
+ assert isinstance(leaves, dict)
+ new_hashes = hashes.copy()
+ for leafnum,leafhash in leaves.iteritems():
+ hashnum = self.first_leaf_num + leafnum
+ if hashnum in new_hashes:
+ assert new_hashes[hashnum] == leafhash
+ new_hashes[hashnum] = leafhash
+
+ added = set() # we'll remove these if the check fails
- return None
+ try:
+ # first we provisionally add all hashes to the tree, comparing any
+ # duplicates
+ for i in new_hashes:
+ if self[i]:
+ if self[i] != new_hashes[i]:
+ raise BadHashError("new hash does not match existing hash at [%d]"
+ % i)
+ else:
+ self[i] = new_hashes[i]
+ added.add(i)
+
+ # then we start from the bottom and compute new parent hashes upwards,
+ # comparing any that already exist. When this phase ends, all nodes
+ # that have a sibling will also have a parent.
+
+ hashes_to_check = list(new_hashes.keys())
+ # leaf-most first means reverse sorted order
+ while hashes_to_check:
+ hashes_to_check.sort()
+ i = hashes_to_check.pop(-1)
+ if i == 0:
+ # The root has no sibling. How lonely.
+ continue
+ if self[self.sibling(i)] is None:
+ # without a sibling, we can't compute a parent
+ continue
+ parentnum = self.parent(i)
+ # make sure we know right from left
+ leftnum, rightnum = sorted([i, self.sibling(i)])
+ new_parent_hash = pair_hash(self[leftnum], self[rightnum])
+ if self[parentnum]:
+ if self[parentnum] != new_parent_hash:
+ raise BadHashError("h([%d]+[%d]) != h[%d]" % (leftnum, rightnum,
+ parentnum))
+ else:
+ self[parentnum] = new_parent_hash
+ added.add(parentnum)
+ hashes_to_check.insert(0, parentnum)
+
+ # then we walk downwards from the top (root), and anything that is
+ # reachable is validated. If any of the hashes that we've added are
+ # unreachable, then they are unvalidated.
+
+ reachable = set()
+ if self[0]:
+ reachable.add(0)
+ # TODO: this could be done more efficiently, by starting from each
+ # element of new_hashes and walking upwards instead, remembering a set
+ # of validated nodes so that the searches for later new_hashes goes
+ # faster. This approach is O(n), whereas O(ln(n)) should be feasible.
+ for i in range(1, len(self)):
+ if self[i] and self.parent(i) in reachable:
+ reachable.add(i)
+
+ # were we unable to validate any of the new hashes?
+ unvalidated = set(new_hashes.keys()) - reachable
+ if unvalidated:
+ if must_validate:
+ those = ",".join([str(i) for i in sorted(unvalidated)])
+ raise NotEnoughHashesError("unable to validate hashes %s" % those)
+
+ except (BadHashError, NotEnoughHashesError):
+ for i in added:
+ self[i] = None
+ raise
+
+ # if there were hashes that could not be validated, we return False
+ return not unvalidated
from twisted.internet import defer
from twisted.application import service
-from allmydata.util import idlib, mathutil
+from allmydata.util import idlib, mathutil, hashutil
from allmydata.util.assertutil import _assert
from allmydata import codec, chunk
from allmydata.Crypto.Cipher import AES
def finish(self):
return self.downloadable.finish()
+class ValidatedBucket:
+ def __init__(self, sharenum, bucket, share_hash_tree, num_blocks):
+ self.sharenum = sharenum
+ self.bucket = bucket
+ self.share_hash_tree = share_hash_tree
+ self.block_hash_tree = chunk.IncompleteHashTree(num_blocks)
+
+ def get_block(self, blocknum):
+ d1 = self.bucket.callRemote('get_block', blocknum)
+ # we might also need to grab some elements of our block hash tree, to
+ # validate the requested block up to the share hash
+ if self.block_hash_tree.needed_hashes(leaves=[blocknum]):
+ d2 = self.bucket.callRemote('get_block_hashes')
+ else:
+ d2 = defer.succeed(None)
+ # we might need to grab some elements of the share hash tree to
+ # validate from our share hash up to the hashroot
+ if self.share_hash_tree.needed_hashes(leaves=[self.sharenum]):
+ d3 = self.bucket.callRemote('get_share_hashes')
+ else:
+ d3 = defer.succeed(None)
+ d = defer.gatherResults([d1, d2, d3])
+ d.addCallback(self._got_data, blocknum)
+ return d
+
+ def _got_data(self, res, blocknum):
+ blockdata, blockhashes, sharehashes = res
+ blockhash = hashutil.tagged_hash("encoded subshare", blockdata)
+ if blockhashes:
+ bh = dict(enumerate(blockhashes))
+ self.block_hash_tree.set_hashes(bh, {blocknum: blockhash},
+ must_validate=True)
+ if sharehashes:
+ sh = dict(sharehashes)
+ sharehash = self.block_hash_tree[0]
+ self.share_hash_tree.set_hashes(sh, {self.sharenum: sharehash},
+ must_validate=True)
+ # If we made it here, the block is good. If the hash trees didn't
+ # like what they saw, they would have raised a BadHashError, causing
+ # our caller to see a Failure and thus ignore this block (as well as
+ # dropping this bucket).
+ return blockdata
+
+
class BlockDownloader:
- def __init__(self, bucket, blocknum, parent):
- self.bucket = bucket
+ def __init__(self, vbucket, blocknum, parent):
+ self.vbucket = vbucket
self.blocknum = blocknum
self.parent = parent
def start(self, segnum):
- d = self.bucket.callRemote('get_block', segnum)
+ d = self.vbucket.get_block(segnum)
d.addCallbacks(self._hold_block, self._got_block_error)
return d
def _got_block_error(self, f):
log.msg("BlockDownloader[%d] got error: %s" % (self.blocknum, f))
- self.parent.bucket_failed(self.blocknum, self.bucket)
+ self.parent.bucket_failed(self.blocknum, self.vbucket)
class SegmentDownloader:
def __init__(self, parent, segmentnumber, needed_shares):
return d
def _try(self):
- while len(self.parent.active_buckets) < self.needed_blocks:
- # need some more
- otherblocknums = list(set(self.parent._share_buckets.keys()) - set(self.parent.active_buckets.keys()))
- if not otherblocknums:
- raise NotEnoughPeersError
- blocknum = random.choice(otherblocknums)
- bucket = random.choice(list(self.parent._share_buckets[blocknum]))
- self.parent.active_buckets[blocknum] = bucket
-
+ # fill our set of active buckets, maybe raising NotEnoughPeersError
+ active_buckets = self.parent._activate_enough_buckets()
# Now we have enough buckets, in self.parent.active_buckets.
- # before we get any blocks of a given share, we need to be able to
- # validate that block and that share. Check to see if we have enough
- # hashes. If we don't, grab them before continuing.
- d = self._grab_needed_hashes()
- d.addCallback(self._download_some_blocks)
- return d
-
- def _grab_needed_hashes(self):
- # each bucket is holding the hashes necessary to validate their
- # share. So it suffices to ask everybody for all the hashes they know
- # about. Eventually we'll have all that we need, so we can stop
- # asking.
-
- # for each active share, see what hashes we need
- ht = self.parent.get_share_hashtree()
- needed_hashes = set()
- for shnum in self.parent.active_buckets:
- needed_hashes.update(ht.needed_hashes(shnum))
- if not needed_hashes:
- return defer.succeed(None)
-
- # for now, just ask everybody for everything
- # TODO: send fewer queries
- dl = []
- for shnum, bucket in self.parent.active_buckets.iteritems():
- d = bucket.callRemote("get_share_hashes")
- d.addCallback(self._got_share_hashes, shnum, bucket)
- dl.append(d)
- d.addCallback(self._validate_root)
- return defer.DeferredList(dl)
-
- def _got_share_hashes(self, share_hashes, shnum, bucket):
- ht = self.parent.get_share_hashtree()
- for hashnum, sharehash in share_hashes:
- # TODO: we're accumulating these hashes blindly, since we only
- # validate the leaves. This makes it possible for someone to
- # frame another server by giving us bad internal hashes. We pass
- # 'shnum' and 'bucket' in so that if we detected problems with
- # intermediate nodes, we could associate the error with the
- # bucket and stop using them.
- ht.set_hash(hashnum, sharehash)
-
- def _validate_root(self, res):
- # TODO: I dunno, check that the hash tree looks good so far and that
- # it adds up to the root. The idea is to reject any bad buckets
- # early.
- pass
-
- def _download_some_blocks(self, res):
# in test cases, bd.start might mutate active_buckets right away, so
# we need to put off calling start() until we've iterated all the way
- # through it
+ # through it.
downloaders = []
- for blocknum, bucket in self.parent.active_buckets.iteritems():
- bd = BlockDownloader(bucket, blocknum, self)
+ for blocknum, vbucket in active_buckets.iteritems():
+ bd = BlockDownloader(vbucket, blocknum, self)
downloaders.append(bd)
l = [bd.start(self.segmentnumber) for bd in downloaders]
- return defer.DeferredList(l)
+ return defer.DeferredList(l, fireOnOneErrback=True)
def hold_block(self, blocknum, data):
self.blocks[blocknum] = data
- def bucket_failed(self, shnum, bucket):
+ def bucket_failed(self, shnum, vbucket):
del self.parent.active_buckets[shnum]
s = self.parent._share_buckets[shnum]
- s.remove(bucket)
+ # s is a set of ValidatedBucket instances
+ s.remove(vbucket)
+ # ... which might now be empty
if not s:
+ # there are no more buckets which can provide this share, so
+ # remove the key. This may prompt us to use a different share.
del self.parent._share_buckets[shnum]
-
+
class FileDownloader:
debug = False
key = "\x00" * 16
self._output = Output(downloadable, key)
- # future:
- # each time we start using a new shnum, we must acquire a share hash
- # from one of the buckets that provides that shnum, then validate it
- # against the rest of the share hash tree that they provide. Then,
- # each time we get a block in that share, we must validate the block
- # against the rest of the subshare hash tree that that bucket will
- # provide.
-
self._share_hashtree = chunk.IncompleteHashTree(total_shares)
- #self._block_hashtrees = {} # k: shnum, v: hashtree
+ self._share_hashtree.set_hashes({0: roothash})
- def get_share_hashtree(self):
- return self._share_hashtree
+ self.active_buckets = {} # k: shnum, v: bucket
+ self._share_buckets = {} # k: shnum, v: set of buckets
def start(self):
log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
if self.debug:
print "starting download"
- # first step: who should we download from?
- self.active_buckets = {} # k: shnum, v: bucket
- self._share_buckets = {} # k: shnum, v: set of buckets
+ # first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders)
+ # once we know that, we can download blocks from them
d.addCallback(self._download_all_segments)
d.addCallback(self._done)
return d
def _got_response(self, buckets, connection):
_assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint
for sharenum, bucket in buckets.iteritems():
- self._share_buckets.setdefault(sharenum, set()).add(bucket)
-
+ self.add_share_bucket(sharenum, bucket)
+
+ def add_share_bucket(self, sharenum, bucket):
+ vbucket = ValidatedBucket(sharenum, bucket,
+ self._share_hashtree,
+ self._total_segments)
+ self._share_buckets.setdefault(sharenum, set()).add(vbucket)
+
def _got_error(self, f):
self._client.log("Somebody failed. -- %s" % (f,))
def _got_all_shareholders(self, res):
if len(self._share_buckets) < self._num_needed_shares:
raise NotEnoughPeersError
+ for s in self._share_buckets.values():
+ for vb in s:
+ assert isinstance(vb, ValidatedBucket), \
+ "vb is %s but should be a ValidatedBucket" % (vb,)
+
+
+ def _activate_enough_buckets(self):
+ """either return a mapping from shnum to a ValidatedBucket that can
+ provide data for that share, or raise NotEnoughPeersError"""
+
+ while len(self.active_buckets) < self._num_needed_shares:
+ # need some more
+ handled_shnums = set(self.active_buckets.keys())
+ available_shnums = set(self._share_buckets.keys())
+ potential_shnums = list(available_shnums - handled_shnums)
+ if not potential_shnums:
+ raise NotEnoughPeersError
+ # choose a random share
+ shnum = random.choice(potential_shnums)
+ # and a random bucket that will provide it
+ validated_bucket = random.choice(list(self._share_buckets[shnum]))
+ self.active_buckets[shnum] = validated_bucket
+ return self.active_buckets
- self.active_buckets = {}
- self._output.open()
def _download_all_segments(self, res):
+ # the promise: upon entry to this function, self._share_buckets
+ # contains enough buckets to complete the download, and some extra
+ # ones to tolerate some buckets dropping out or having errors.
+ # self._share_buckets is a dictionary that maps from shnum to a set
+ # of ValidatedBuckets, which themselves are wrappers around
+ # RIBucketReader references.
+ self.active_buckets = {} # k: shnum, v: ValidatedBucket instance
+ self._output.open()
+
d = defer.succeed(None)
for segnum in range(self._total_segments-1):
d.addCallback(self._download_segment, segnum)
client = None
target = download.Data()
fd = download.FileDownloader(client, URI, target)
- fd._share_buckets = {}
for shnum in range(NUM_SHARES):
- fd._share_buckets[shnum] = set([all_shareholders[shnum]])
+ bucket = all_shareholders[shnum]
+ fd.add_share_bucket(shnum, bucket)
fd._got_all_shareholders(None)
d2 = fd._download_all_segments(None)
d2.addCallback(fd._done)
root = ht[0]
self.failUnlessEqual(len(root), 32)
+ def testDump(self):
+ ht = make_tree(6)
+ expected = [(0,0),
+ (1,1), (3,2), (7,3), (8,3), (4,2), (9,3), (10,3),
+ (2,1), (5,2), (11,3), (12,3), (6,2), (13,3), (14,3),
+ ]
+ self.failUnlessEqual(list(ht.depth_first()), expected)
+ d = "\n" + ht.dump()
+ #print d
+ self.failUnless("\n 0:" in d)
+ self.failUnless("\n 1:" in d)
+ self.failUnless("\n 3:" in d)
+ self.failUnless("\n 7:" in d)
+ self.failUnless("\n 8:" in d)
+ self.failUnless("\n 4:" in d)
+
class Incomplete(unittest.TestCase):
+
def testCheck(self):
# first create a complete hash tree
ht = make_tree(6)
# suppose we wanted to validate leaf[0]
# leaf[0] is the same as node[7]
- self.failUnlessEqual(iht.needed_hashes(0), set([8, 4, 2, 0]))
- self.failUnlessEqual(iht.needed_hashes(1), set([7, 4, 2, 0]))
- iht.set_hash(0, ht[0])
- self.failUnlessEqual(iht.needed_hashes(0), set([8, 4, 2]))
- self.failUnlessEqual(iht.needed_hashes(1), set([7, 4, 2]))
- iht.set_hash(5, ht[5])
- self.failUnlessEqual(iht.needed_hashes(0), set([8, 4, 2]))
- self.failUnlessEqual(iht.needed_hashes(1), set([7, 4, 2]))
-
+ self.failUnlessEqual(iht.needed_hashes(leaves=[0]), set([8, 4, 2, 0]))
+ self.failUnlessEqual(iht.needed_hashes(leaves=[1]), set([7, 4, 2, 0]))
+ iht.set_hashes({0: ht[0]}) # set the root
+ self.failUnlessEqual(iht.needed_hashes(leaves=[0]), set([8, 4, 2]))
+ self.failUnlessEqual(iht.needed_hashes(leaves=[1]), set([7, 4, 2]))
+ iht.set_hashes({5: ht[5]})
+ self.failUnlessEqual(iht.needed_hashes(leaves=[0]), set([8, 4, 2]))
+ self.failUnlessEqual(iht.needed_hashes(leaves=[1]), set([7, 4, 2]))
+
+ current_hashes = list(iht)
try:
# this should fail because there aren't enough hashes known
- iht.set_leaf(0, tagged_hash("tag", "0"))
+ iht.set_hashes(leaves={0: tagged_hash("tag", "0")},
+ must_validate=True)
except chunk.NotEnoughHashesError:
pass
else:
self.fail("didn't catch not enough hashes")
+ # and the set of hashes stored in the tree should still be the same
+ self.failUnlessEqual(list(iht), current_hashes)
+
# provide the missing hashes
- iht.set_hash(2, ht[2])
- iht.set_hash(4, ht[4])
- iht.set_hash(8, ht[8])
- self.failUnlessEqual(iht.needed_hashes(0), set([]))
+ iht.set_hashes({2: ht[2], 4: ht[4], 8: ht[8]})
+ self.failUnlessEqual(iht.needed_hashes(leaves=[0]), set())
try:
# this should fail because the hash is just plain wrong
- iht.set_leaf(0, tagged_hash("bad tag", "0"))
+ iht.set_hashes(leaves={0: tagged_hash("bad tag", "0")})
except chunk.BadHashError:
pass
else:
try:
# this should succeed
- iht.set_leaf(0, tagged_hash("tag", "0"))
+ iht.set_hashes(leaves={0: tagged_hash("tag", "0")})
except chunk.BadHashError, e:
self.fail("bad hash: %s" % e)
try:
# this should succeed too
- iht.set_leaf(1, tagged_hash("tag", "1"))
+ iht.set_hashes(leaves={1: tagged_hash("tag", "1")})
except chunk.BadHashError:
self.fail("bad hash")
# giving it a bad internal hash should also cause problems
- iht.set_hash(2, tagged_hash("bad tag", "x"))
+ iht.set_hashes({13: tagged_hash("bad tag", "x")})
try:
- iht.set_leaf(0, tagged_hash("tag", "0"))
+ iht.set_hashes({14: tagged_hash("tag", "14")})
except chunk.BadHashError:
pass
else:
self.fail("didn't catch bad hash")
# undo our damage
- iht.set_hash(2, ht[2])
+ iht[13] = None
- self.failUnlessEqual(iht.needed_hashes(4), set([12, 6]))
+ self.failUnlessEqual(iht.needed_hashes(leaves=[4]), set([12, 6]))
- iht.set_hash(6, ht[6])
- iht.set_hash(12, ht[12])
+ iht.set_hashes({6: ht[6], 12: ht[12]})
try:
# this should succeed
- iht.set_leaf(4, tagged_hash("tag", "4"))
+ iht.set_hashes(leaves={4: tagged_hash("tag", "4")})
except chunk.BadHashError, e:
self.fail("bad hash: %s" % e)