encode: add plaintext/crypttext merkle trees to the shares, and the thingA block...
authorBrian Warner <warner@lothar.com>
Thu, 7 Jun 2007 02:40:20 +0000 (19:40 -0700)
committerBrian Warner <warner@lothar.com>
Thu, 7 Jun 2007 02:40:20 +0000 (19:40 -0700)
src/allmydata/encode.py
src/allmydata/hashtree.py
src/allmydata/interfaces.py
src/allmydata/storageserver.py
src/allmydata/test/test_encode.py

index 84faa77036323b88945dafd36a478c35ef68bb2c..14d1804bed7a1663e316c313a5893f1322a8ecfb 100644 (file)
@@ -3,7 +3,8 @@
 from zope.interface import implements
 from twisted.internet import defer
 from twisted.python import log
-from allmydata.hashtree import HashTree, block_hash, thingA_hash
+from allmydata.hashtree import HashTree, IncompleteHashTree, \
+     block_hash, thingA_hash, plaintext_hash, crypttext_hash
 from allmydata.Crypto.Cipher import AES
 from allmydata.util import mathutil, bencode
 from allmydata.util.assertutil import _assert
@@ -164,8 +165,10 @@ class Encoder(object):
                                               self.segment_size)
         self.share_size = mathutil.div_ceil(self.file_size,
                                             self.required_shares)
+        self._plaintext_hashes = []
+        self._crypttext_hashes = []
         self.setup_encryption()
-        self.setup_codec()
+        self.setup_codec() # TODO: duplicate call?
         d = defer.succeed(None)
 
         for i in range(self.num_segments-1):
@@ -176,6 +179,10 @@ class Encoder(object):
             d.addCallback(lambda res, i=i: self.do_segment(i))
         d.addCallback(lambda res: self.do_tail_segment(self.num_segments-1))
 
+        d.addCallback(lambda res:
+                      self.send_plaintext_hash_tree_to_all_shareholders())
+        d.addCallback(lambda res:
+                      self.send_crypttext_hash_tree_to_all_shareholders())
         d.addCallback(lambda res: self.send_all_subshare_hash_trees())
         d.addCallback(lambda res: self.send_all_share_hash_trees())
         d.addCallback(lambda res: self.send_thingA_to_all_shareholders())
@@ -216,8 +223,10 @@ class Encoder(object):
             input_piece = self.infile.read(input_piece_size)
             # non-tail segments should be the full segment size
             assert len(input_piece) == input_piece_size
+            self._plaintext_hashes.append(plaintext_hash(input_piece))
             encrypted_piece = self.cryptor.encrypt(input_piece)
             chunks.append(encrypted_piece)
+            self._crypttext_hashes.append(crypttext_hash(encrypted_piece))
         d = codec.encode(chunks)
         d.addCallback(self._encoded_segment, segnum)
         return d
@@ -229,10 +238,12 @@ class Encoder(object):
 
         for i in range(self.required_shares):
             input_piece = self.infile.read(input_piece_size)
+            self._plaintext_hashes.append(plaintext_hash(input_piece))
             if len(input_piece) < input_piece_size:
                 # padding
                 input_piece += ('\x00' * (input_piece_size - len(input_piece)))
             encrypted_piece = self.cryptor.encrypt(input_piece)
+            self._crypttext_hashes.append(crypttext_hash(encrypted_piece))
             chunks.append(encrypted_piece)
         d = codec.encode(chunks)
         d.addCallback(self._encoded_segment, segnum)
@@ -300,6 +311,42 @@ class Encoder(object):
             d0.addErrback(_eatNotEnoughPeersError)
         return d
 
+    def send_plaintext_hash_tree_to_all_shareholders(self):
+        log.msg("%s sending plaintext hash tree" % self)
+        t = HashTree(self._plaintext_hashes)
+        all_hashes = list(t)
+        self.thingA_data["plaintext_root_hash"] = t[0]
+        dl = []
+        for shareid in self.landlords.keys():
+            dl.append(self.send_plaintext_hash_tree(shareid, all_hashes))
+        return self._gather_responses(dl)
+
+    def send_plaintext_hash_tree(self, shareid, all_hashes):
+        if shareid not in self.landlords:
+            return defer.succeed(None)
+        sh = self.landlords[shareid]
+        d = sh.callRemote("put_plaintext_hashes", all_hashes)
+        d.addErrback(self._remove_shareholder, shareid, "put_plaintext_hashes")
+        return d
+
+    def send_crypttext_hash_tree_to_all_shareholders(self):
+        log.msg("%s sending crypttext hash tree" % self)
+        t = HashTree(self._crypttext_hashes)
+        all_hashes = list(t)
+        self.thingA_data["crypttext_root_hash"] = t[0]
+        dl = []
+        for shareid in self.landlords.keys():
+            dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
+        return self._gather_responses(dl)
+
+    def send_crypttext_hash_tree(self, shareid, all_hashes):
+        if shareid not in self.landlords:
+            return defer.succeed(None)
+        sh = self.landlords[shareid]
+        d = sh.callRemote("put_crypttext_hashes", all_hashes)
+        d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
+        return d
+
     def send_all_subshare_hash_trees(self):
         log.msg("%s sending subshare hash trees" % self)
         dl = []
index 2640cfc1766cb4a4bf6c4e482344580dd3d29e53..3694b91b0bc29b8162c8bcbf1eed8973f98d4dc0 100644 (file)
@@ -442,3 +442,9 @@ def block_hash(data):
 
 def thingA_hash(data):
     return tagged_hash("thingA", data)
+
+def plaintext_hash(data):
+    return tagged_hash("plaintext segment", data)
+
+def crypttext_hash(data):
+    return tagged_hash("crypttext segment", data)
index f419145b8702116f77f2239bb1e8eedc6358e856..2917ee460ce0e743222528547f67470886dcd763 100644 (file)
@@ -51,7 +51,12 @@ class RIBucketWriter(RemoteInterface):
         bytes in length. The last segment might be shorter.
         """
         return None
-    
+
+    def put_plaintext_hashes(hashes=ListOf(Hash, maxLength=2**20)):
+        return None
+    def put_crypttext_hashes(hashes=ListOf(Hash, maxLength=2**20)):
+        return None
+
     def put_block_hashes(blockhashes=ListOf(Hash, maxLength=2**20)):
         return None
         
index 310a965e53532d6977120dd86283598c5a935722..788c638968ef094e9ffc03d5cf1f9acd234c756f 100644 (file)
@@ -49,6 +49,18 @@ class BucketWriter(Referenceable):
         f.seek(self.blocksize*segmentnum)
         f.write(data)
 
+    def remote_put_plaintext_hashes(self, hashes):
+        precondition(not self.closed)
+        # TODO: verify the length of blockhashes.
+        # TODO: tighten foolscap schema to require exactly 32 bytes.
+        self._write_file('plaintext_hashes', ''.join(hashes))
+
+    def remote_put_crypttext_hashes(self, hashes):
+        precondition(not self.closed)
+        # TODO: verify the length of blockhashes.
+        # TODO: tighten foolscap schema to require exactly 32 bytes.
+        self._write_file('crypttext_hashes', ''.join(hashes))
+
     def remote_put_block_hashes(self, blockhashes):
         precondition(not self.closed)
         # TODO: verify the length of blockhashes.
index d8f58087b4f85fd22deec6d1bffb13071e5c9fcb..cb6f2e37cd386dbfc8cf9e77599e4a413e60470f 100644 (file)
@@ -49,6 +49,8 @@ class FakeBucketWriter:
     def __init__(self, mode="good"):
         self.mode = mode
         self.blocks = {}
+        self.plaintext_hashes = None
+        self.crypttext_hashes = None
         self.block_hashes = None
         self.share_hashes = None
         self.closed = False
@@ -66,6 +68,16 @@ class FakeBucketWriter:
             raise LostPeerError("I'm going away now")
         self.blocks[segmentnum] = data
 
+    def put_plaintext_hashes(self, hashes):
+        assert not self.closed
+        assert self.plaintext_hashes is None
+        self.plaintext_hashes = hashes
+
+    def put_crypttext_hashes(self, hashes):
+        assert not self.closed
+        assert self.crypttext_hashes is None
+        self.crypttext_hashes = hashes
+
     def put_block_hashes(self, blockhashes):
         assert not self.closed
         assert self.block_hashes is None