From: Brian Warner Date: Tue, 26 Jun 2007 19:36:21 +0000 (-0700) Subject: vdrive: protect dirnode contents with an HMAC X-Git-Tag: allmydata-tahoe-0.4.0~30 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/%22doc.html/running.html?a=commitdiff_plain;h=bc2603c818d6faaddc50be09d4fe6f203ee16101;p=tahoe-lafs%2Ftahoe-lafs.git vdrive: protect dirnode contents with an HMAC --- diff --git a/src/allmydata/test/test_vdrive.py b/src/allmydata/test/test_vdrive.py index 4ae201a9..2349cefa 100644 --- a/src/allmydata/test/test_vdrive.py +++ b/src/allmydata/test/test_vdrive.py @@ -255,3 +255,35 @@ class Test(unittest.TestCase): sorted(expected_keys)) return res +def flip_bit(data, offset): + if offset < 0: + offset = len(data) + offset + return data[:offset] + chr(ord(data[offset]) ^ 0x01) + data[offset+1:] + +class Encryption(unittest.TestCase): + def test_loopback(self): + key = "k" * 16 + data = "This is some plaintext data." + crypttext = vdrive.encrypt(key, data) + plaintext = vdrive.decrypt(key, crypttext) + self.failUnlessEqual(data, plaintext) + + def test_hmac(self): + key = "j" * 16 + data = "This is some more plaintext data." + crypttext = vdrive.encrypt(key, data) + # flip a bit in the IV + self.failUnlessRaises(vdrive.IntegrityCheckError, + vdrive.decrypt, + key, flip_bit(crypttext, 0)) + # flip a bit in the crypttext + self.failUnlessRaises(vdrive.IntegrityCheckError, + vdrive.decrypt, + key, flip_bit(crypttext, 16)) + # flip a bit in the HMAC + self.failUnlessRaises(vdrive.IntegrityCheckError, + vdrive.decrypt, + key, flip_bit(crypttext, -1)) + plaintext = vdrive.decrypt(key, crypttext) + self.failUnlessEqual(data, plaintext) + diff --git a/src/allmydata/util/hashutil.py b/src/allmydata/util/hashutil.py index d44003c5..db8c937c 100644 --- a/src/allmydata/util/hashutil.py +++ b/src/allmydata/util/hashutil.py @@ -79,3 +79,13 @@ def generate_dirnode_keys_from_writekey(write_key): def generate_dirnode_keys_from_readkey(read_key): index = dir_index_hash(read_key) return None, None, read_key, index + +def _xor(a, b): + return "".join([chr(ord(c) ^ ord(b)) for c in a]) + +def hmac(tag, data): + ikey = _xor(tag, "\x36") + okey = _xor(tag, "\x5c") + h1 = SHA256.new(ikey + data).digest() + h2 = SHA256.new(okey + h1).digest() + return h2 diff --git a/src/allmydata/vdrive.py b/src/allmydata/vdrive.py index 8a30e932..c1876598 100644 --- a/src/allmydata/vdrive.py +++ b/src/allmydata/vdrive.py @@ -27,23 +27,29 @@ def create_directory_node(client, diruri): d.addCallback(_got) return d +IV_LENGTH = 14 def encrypt(key, data): - # TODO: add the hmac - IV = os.urandom(14) - counterstart = IV + "\x00"*2 + IV = os.urandom(IV_LENGTH) + counterstart = IV + "\x00"*(16-IV_LENGTH) assert len(counterstart) == 16, len(counterstart) cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart=counterstart) crypttext = cryptor.encrypt(data) - return IV + crypttext + mac = hashutil.hmac(key, IV + crypttext) + assert len(mac) == 32 + return IV + crypttext + mac + +class IntegrityCheckError(Exception): + pass def decrypt(key, data): - # TODO: validate the hmac - assert len(data) >= 14, len(data) - IV = data[:14] - counterstart = IV + "\x00"*2 + assert len(data) >= (32+IV_LENGTH), len(data) + IV, crypttext, mac = data[:IV_LENGTH], data[IV_LENGTH:-32], data[-32:] + if mac != hashutil.hmac(key, IV+crypttext): + raise IntegrityCheckError("HMAC does not match, crypttext is corrupted") + counterstart = IV + "\x00"*(16-IV_LENGTH) assert len(counterstart) == 16, len(counterstart) cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart=counterstart) - plaintext = cryptor.decrypt(data[14:]) + plaintext = cryptor.decrypt(crypttext) return plaintext