download: retrieve share hashes when downloading. We don't really do much validation...
authorBrian Warner <warner@lothar.com>
Sat, 7 Apr 2007 05:51:19 +0000 (22:51 -0700)
committerBrian Warner <warner@lothar.com>
Sat, 7 Apr 2007 05:51:19 +0000 (22:51 -0700)
src/allmydata/download.py
src/allmydata/storageserver.py

index 12cc03de2941a4ab71aa22b81bc561fb63159fae..458e7f2a649d8b5d36a58e581433e3dbaf48f878 100644 (file)
@@ -7,7 +7,7 @@ from twisted.application import service
 
 from allmydata.util import idlib, mathutil
 from allmydata.util.assertutil import _assert
-from allmydata import codec
+from allmydata import codec, chunk
 from allmydata.Crypto.Cipher import AES
 from allmydata.uri import unpack_uri
 from allmydata.interfaces import IDownloadTarget, IDownloader
@@ -105,6 +105,55 @@ class SegmentDownloader:
 
         # Now we have enough buckets, in self.parent.active_buckets.
 
+        # before we get any blocks of a given share, we need to be able to
+        # validate that block and that share. Check to see if we have enough
+        # hashes. If we don't, grab them before continuing.
+        d = self._grab_needed_hashes()
+        d.addCallback(self._download_some_blocks)
+        return d
+
+    def _grab_needed_hashes(self):
+        # each bucket is holding the hashes necessary to validate their
+        # share. So it suffices to ask everybody for all the hashes they know
+        # about. Eventually we'll have all that we need, so we can stop
+        # asking.
+
+        # for each active share, see what hashes we need
+        ht = self.parent.get_share_hashtree()
+        needed_hashes = set()
+        for shnum in self.parent.active_buckets:
+            needed_hashes.update(ht.needed_hashes(shnum))
+        if not needed_hashes:
+            return defer.succeed(None)
+
+        # for now, just ask everybody for everything
+        # TODO: send fewer queries
+        dl = []
+        for shnum, bucket in self.parent.active_buckets.iteritems():
+            d = bucket.callRemote("get_share_hashes")
+            d.addCallback(self._got_share_hashes, shnum, bucket)
+            dl.append(d)
+        d.addCallback(self._validate_root)
+        return defer.DeferredList(dl)
+
+    def _got_share_hashes(self, share_hashes, shnum, bucket):
+        ht = self.parent.get_share_hashtree()
+        for hashnum, sharehash in share_hashes:
+            # TODO: we're accumulating these hashes blindly, since we only
+            # validate the leaves. This makes it possible for someone to
+            # frame another server by giving us bad internal hashes. We pass
+            # 'shnum' and 'bucket' in so that if we detected problems with
+            # intermediate nodes, we could associate the error with the
+            # bucket and stop using them.
+            ht.set_hash(hashnum, sharehash)
+
+    def _validate_root(self, res):
+        # TODO: I dunno, check that the hash tree looks good so far and that
+        # it adds up to the root. The idea is to reject any bad buckets
+        # early.
+        pass
+
+    def _download_some_blocks(self, res):
         # in test cases, bd.start might mutate active_buckets right away, so
         # we need to put off calling start() until we've iterated all the way
         # through it
@@ -153,13 +202,18 @@ class FileDownloader:
         self._output = Output(downloadable, key)
 
         # future:
-        # self._share_hash_tree = ??
-        # self._subshare_hash_trees = {} # k:shnum, v: hashtree
         # each time we start using a new shnum, we must acquire a share hash
-        # from one of the buckets that provides that shnum, then validate it against
-        # the rest of the share hash tree that they provide. Then, each time we
-        # get a block in that share, we must validate the block against the rest
-        # of the subshare hash tree that that bucket will provide.
+        # from one of the buckets that provides that shnum, then validate it
+        # against the rest of the share hash tree that they provide. Then,
+        # each time we get a block in that share, we must validate the block
+        # against the rest of the subshare hash tree that that bucket will
+        # provide.
+
+        self._share_hashtree = chunk.IncompleteHashTree(total_shares)
+        #self._block_hashtrees = {} # k: shnum, v: hashtree
+
+    def get_share_hashtree(self):
+        return self._share_hashtree
 
     def start(self):
         log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
index df9b1a615cb846e201b88df63cda784c0bdcc28a..5bab5e26bac793b0a702df769622549f823912f0 100644 (file)
@@ -51,7 +51,7 @@ class BucketWriter(Referenceable):
 
     def remote_put_share_hashes(self, sharehashes):
         precondition(not self.closed)
-        self._write_file('sharehashree', bencode.bencode(sharehashes))
+        self._write_file('sharehashes', bencode.bencode(sharehashes))
 
     def remote_close(self):
         precondition(not self.closed)
@@ -89,8 +89,11 @@ class BucketReader(Referenceable):
         return str2l(self._read_file('blockhashes'))
 
     def remote_get_share_hashes(self):
-        return bencode.bdecode(self._read_file('sharehashes'))
-   
+        hashes = bencode.bdecode(self._read_file('sharehashes'))
+        # tuples come through bdecode(bencode()) as lists, which violates the
+        # schema
+        return [tuple(i) for i in hashes]
+
 class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer)
     name = 'storageserver'