from allmydata.util import idlib, mathutil
from allmydata.util.assertutil import _assert
-from allmydata import codec
+from allmydata import codec, chunk
from allmydata.Crypto.Cipher import AES
from allmydata.uri import unpack_uri
from allmydata.interfaces import IDownloadTarget, IDownloader
# Now we have enough buckets, in self.parent.active_buckets.
+ # before we get any blocks of a given share, we need to be able to
+ # validate that block and that share. Check to see if we have enough
+ # hashes. If we don't, grab them before continuing.
+ d = self._grab_needed_hashes()
+ d.addCallback(self._download_some_blocks)
+ return d
+
+ def _grab_needed_hashes(self):
+ # each bucket is holding the hashes necessary to validate their
+ # share. So it suffices to ask everybody for all the hashes they know
+ # about. Eventually we'll have all that we need, so we can stop
+ # asking.
+
+ # for each active share, see what hashes we need
+ ht = self.parent.get_share_hashtree()
+ needed_hashes = set()
+ for shnum in self.parent.active_buckets:
+ needed_hashes.update(ht.needed_hashes(shnum))
+ if not needed_hashes:
+ return defer.succeed(None)
+
+ # for now, just ask everybody for everything
+ # TODO: send fewer queries
+ dl = []
+ for shnum, bucket in self.parent.active_buckets.iteritems():
+ d = bucket.callRemote("get_share_hashes")
+ d.addCallback(self._got_share_hashes, shnum, bucket)
+ dl.append(d)
+ d.addCallback(self._validate_root)
+ return defer.DeferredList(dl)
+
+ def _got_share_hashes(self, share_hashes, shnum, bucket):
+ ht = self.parent.get_share_hashtree()
+ for hashnum, sharehash in share_hashes:
+ # TODO: we're accumulating these hashes blindly, since we only
+ # validate the leaves. This makes it possible for someone to
+ # frame another server by giving us bad internal hashes. We pass
+ # 'shnum' and 'bucket' in so that if we detected problems with
+ # intermediate nodes, we could associate the error with the
+ # bucket and stop using them.
+ ht.set_hash(hashnum, sharehash)
+
+ def _validate_root(self, res):
+ # TODO: I dunno, check that the hash tree looks good so far and that
+ # it adds up to the root. The idea is to reject any bad buckets
+ # early.
+ pass
+
+ def _download_some_blocks(self, res):
# in test cases, bd.start might mutate active_buckets right away, so
# we need to put off calling start() until we've iterated all the way
# through it
self._output = Output(downloadable, key)
# future:
- # self._share_hash_tree = ??
- # self._subshare_hash_trees = {} # k:shnum, v: hashtree
# each time we start using a new shnum, we must acquire a share hash
- # from one of the buckets that provides that shnum, then validate it against
- # the rest of the share hash tree that they provide. Then, each time we
- # get a block in that share, we must validate the block against the rest
- # of the subshare hash tree that that bucket will provide.
+ # from one of the buckets that provides that shnum, then validate it
+ # against the rest of the share hash tree that they provide. Then,
+ # each time we get a block in that share, we must validate the block
+ # against the rest of the subshare hash tree that that bucket will
+ # provide.
+
+ self._share_hashtree = chunk.IncompleteHashTree(total_shares)
+ #self._block_hashtrees = {} # k: shnum, v: hashtree
+
+ def get_share_hashtree(self):
+ return self._share_hashtree
def start(self):
log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
def remote_put_share_hashes(self, sharehashes):
precondition(not self.closed)
- self._write_file('sharehashree', bencode.bencode(sharehashes))
+ self._write_file('sharehashes', bencode.bencode(sharehashes))
def remote_close(self):
precondition(not self.closed)
return str2l(self._read_file('blockhashes'))
def remote_get_share_hashes(self):
- return bencode.bdecode(self._read_file('sharehashes'))
-
+ hashes = bencode.bdecode(self._read_file('sharehashes'))
+ # tuples come through bdecode(bencode()) as lists, which violates the
+ # schema
+ return [tuple(i) for i in hashes]
+
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer)
name = 'storageserver'