From: Brian Warner Date: Thu, 14 Dec 2006 03:32:35 +0000 (-0700) Subject: start work on new encoder, with merkle trees and subshares and stuff X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~436 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/vdrive/global?a=commitdiff_plain;h=133e4a439457792734c7b4c3fd282ca220ee1c05;p=tahoe-lafs%2Ftahoe-lafs.git start work on new encoder, with merkle trees and subshares and stuff --- diff --git a/allmydata/chunk.py b/allmydata/chunk.py new file mode 100644 index 00000000..ec904191 --- /dev/null +++ b/allmydata/chunk.py @@ -0,0 +1,727 @@ + +""" +Read and write chunks from files. + +Version 1.0.0. + +A file is divided into blocks, each of which has size L{BLOCK_SIZE} +(except for the last block, which may be smaller). Blocks are encoded +into chunks. One publishes the hash of the entire file. Clients +who want to download the file first obtain the hash, then the clients +can receive chunks in any order. Cryptographic hashing is used to +verify each received chunk before writing to disk. Thus it is +impossible to download corrupt data if one has the correct file hash. + +One obtains the hash of a complete file via +L{CompleteChunkFile.file_hash}. One can read chunks from a complete +file by the sequence operations of C{len()} and subscripting on a +L{CompleteChunkFile} object. One can open an empty or partially +downloaded file with L{PartialChunkFile}, and read and write chunks +to this file. A chunk will fail to write if its contents and index +are not consistent with the overall file hash passed to +L{PartialChunkFile} when the partial chunk file was first created. + +The chunks have an overhead of less than 4% for files of size +less than C{10**20} bytes. + +Benchmarks: + + - On a 3 GHz Pentium 3, it took 3.4 minutes to first make a + L{CompleteChunkFile} object for a 4 GB file. Up to 10 MB of + memory was used as the constructor ran. A metafile filename + was passed to the constructor, and so the hash information was + written to the metafile. The object used a negligible amount + of memory after the constructor was finished. + - Creation of L{CompleteChunkFile} objects in future runs of the + program took negligible time, since the hash information was + already stored in the metafile. + +@var BLOCK_SIZE: Size of a block. See L{BlockFile}. +@var MAX_CHUNK_SIZE: Upper bound on the size of a chunk. + See L{CompleteChunkFile}. + +free (adj.): unencumbered; not under the control of others +Written by Connelly Barnes in 2005 and released into the +public domain with no warranty of any kind, either expressed +or implied. It probably won't make your computer catch on fire, +or eat your children, but it might. Use at your own risk. +""" + +import sha +import os +#import os.path + +from allmydata.util import bencode + +__all__ = ['CompleteChunkFile', 'PartialChunkFile'] + +__version__ = '1.0.0' + +BLOCK_SIZE = 65536 +MAX_CHUNK_SIZE = BLOCK_SIZE + 4096 + +def hash(s): + """ + Cryptographic hash function used by this module. + """ + return sha.new(s).digest() + + +def roundup_pow2(x): + """ + Round integer C{x} up to the nearest power of 2. + """ + ans = 1 + while ans < x: + ans *= 2 + return ans + + +class CompleteBinaryTreeMixin: + """ + Adds convenience methods to a complete binary tree. + + Assumes the total number of elements in the binary tree may be + accessed via C{__len__}, and that each element can be retrieved + using list subscripting. + + Tree is indexed like so:: + + + 0 + / \ + 1 2 + / \ / \ + 3 4 5 6 + / \ / \ / \ / \ + 7 8 9 10 11 12 13 14 + + """ + def parent(self, i): + """ + Index of the parent of C{i}. + """ + if i < 1 or (hasattr(self, '__len__') and i >= len(self)): + raise IndexError('index out of range: ' + repr(i)) + return (i - 1) // 2 + + def lchild(self, i): + """ + Index of the left child of C{i}. + """ + ans = 2 * i + 1 + if i < 0 or (hasattr(self, '__len__') and ans >= len(self)): + raise IndexError('index out of range: ' + repr(i)) + return ans + + def rchild(self, i): + """ + Index of right child of C{i}. + """ + ans = 2 * i + 2 + if i < 0 or (hasattr(self, '__len__') and ans >= len(self)): + raise IndexError('index out of range: ' + repr(i)) + return ans + + def sibling(self, i): + """ + Index of sibling of C{i}. + """ + parent = self.parent(i) + if self.lchild(parent) == i: + return self.rchild(parent) + else: + return self.lchild(parent) + + def needed(self, i): + """ + Return a list of nodes that are necessary for the hash chain. + """ + if i < 0 or i >= len(self): + raise IndexError('index out of range: ' + repr(i)) + needed = [] + here = i + while here != 0: + needed.append(self.sibling(here)) + here = self.parent(here) + return needed + + +class HashTree(CompleteBinaryTreeMixin, list): + """ + Compute Merkle hashes at any node in a complete binary tree. + + Tree is indexed like so:: + + + 0 + / \ + 1 2 + / \ / \ + 3 4 5 6 + / \ / \ / \ / \ + 7 8 9 10 11 12 13 14 <- List passed to constructor. + + """ + def __init__(self, L): + """ + Create complete binary tree from list of hash strings. + + The list is augmented by hashes so its length is a power of 2, and + then this is used as the bottom row of the hash tree. + + The augmenting is done so that if the augmented element is at + index C{i}, then its value is C{hash(bencode.bencode((i, '')))}. + """ + # Augment the list. + start = len(L) + end = roundup_pow2(len(L)) + L = L + [None] * (end - start) + for i in range(start, end): + L[i] = hash(bencode.bencode((i, ''))) + # Form each row of the tree. + rows = [L] + while len(rows[-1]) != 1: + last = rows[-1] + rows += [[hash(last[2*i] + last[2*i+1]) for i in xrange(len(last)//2)]] + # Flatten the list of rows into a single list. + rows.reverse() + self[:] = sum(rows, []) + + +class BlockFile: + """ + Reads and writes blocks of data to a binary file. + + It is assumed that the binary file does not change in size. + + @ivar file_name: Full path to file. + @ivar file_size: Size of file in bytes. + @ivar block_size: Size of each block. + """ + def __init__(self, file_name, mode, block_size, file_size=None): + """ + Initialize block reader or writer on given file name. + + If mode is 'r', the file must already exist and it is opened for + reading only. If mode is 'w', the file will be created with size + C{file_size} if it does not exist, and it is opened for reading + and writing. + + Note that C{file_size} is ignored if the file already exists. + """ + self.mode = mode + self.file_name = os.path.abspath(file_name) + assert self.mode in ['r', 'w'] + + if mode == 'r': + f = open(self.file_name, 'rb') + f.close() + + # Create file if it doesn't exist. + created = False + if mode == 'w' and not os.path.exists(self.file_name): + created = True + buf = ' ' * 1024 + f = open(self.file_name, 'wb') + for i in xrange(file_size // len(buf)): + f.write(buf) + f.write(' ' * (file_size % len(buf))) + f.close() + + self.file_size = os.stat(self.file_name).st_size + if created: + assert self.file_size == file_size + self.block_size = block_size + self.__block_count = self.file_size // self.block_size + if self.file_size % self.block_size == 0: + self.last_block_size = self.block_size + else: + self.last_block_size = self.file_size % self.block_size + self.__block_count += 1 + + def __getitem__(self, i): + """ + Get block i. + """ + if i < 0 or i >= len(self): + raise IndexError('block index out of range: ' + repr(i)) + f = open(self.file_name, 'rb') + try: + f.seek(i * self.block_size) + ans = f.read(self.block_size) + finally: + f.close() + return ans + + def __setitem__(self, i, s): + """ + Set block i. + """ + if self.mode != 'w': + raise ValueError('file opened for reading only') + if i < 0 or i >= len(self): + raise IndexError('block index out of range: ' + repr(i)) + if i < len(self) - 1: + if len(s) != self.block_size: + raise ValueError('length of value must equal block_size') + else: + if len(s) != self.last_block_size: + raise ValueError('length of value must equal last_block_size') + f = open(self.file_name, 'rb+') + try: + f.seek(i * self.block_size) + f.write(s) + finally: + f.close() + + def __len__(self): + """ + Get number of blocks. + """ + return int(self.__block_count) + + +class MetaFile(CompleteBinaryTreeMixin): + """ + A L{HashTree} stored on disk, with a timestamp. + + The list of hashes can be accessed using subscripting and + C{__len__}, in the same manner as for L{HashTree}. + + Note that the constructor takes the entire list associated with + the L{HashTree}, not just the bottom row of the tree. + + @ivar meta_name: Full path to metafile. + """ + def __init__(self, meta_name, mode, L=None): + """ + Open an existing meta-file for reading or writing. + + If C{mode} is 'r', the meta-file must already exist and it is + opened for reading only, and the list C{L} is ignored. If C{mode} + is 'w', the file will be created if it does not exist (from the + list of hashes given in C{L}), and it is opened for reading and + writing. + """ + self.meta_name = os.path.abspath(meta_name) + self.mode = mode + assert self.mode in ['r', 'w'] + + # A timestamp is stored at index 0. The MetaFile instance + # offsets all indices passed to __getitem__, __setitem__ by + # this offset, and pretends it has length equal to + # self.sublength. + self.offset = 1 + + if self.mode == 'w': + suggested_length = len(hash('')) * (len(L)+self.offset) + else: + suggested_length = None + + created = False + if self.mode == 'w' and not os.path.exists(self.meta_name): + created = True + + self.block_file = BlockFile(self.meta_name, self.mode, + len(hash('')), + suggested_length) + self.sublength = len(self.block_file) - self.offset + + if created: + for i in xrange(len(L)): + self.block_file[i + self.offset] = L[i] + + def __getitem__(self, i): + if i < 0 or i >= self.sublength: + raise IndexError('bad meta-file block index') + return self.block_file[i + self.offset] + + def __setitem__(self, i, value): + if i < 0 or i >= self.sublength: + raise IndexError('bad meta-file block index') + self.block_file[i + self.offset] = value + + def __len__(self): + return self.sublength + + def set_timestamp(self, file_name): + """ + Set meta file's timestamp equal to the timestamp for C{file_name}. + """ + st = os.stat(file_name) + timestamp = bencode.bencode((st.st_size, st.st_mtime)) + self.block_file[0] = sha.new(timestamp).digest() + + def check_timestamp(self, file_name): + """ + True if meta file's timestamp equals timestamp for C{file_name}. + """ + st = os.stat(file_name) + timestamp = bencode.bencode((st.st_size, st.st_mtime)) + return self.block_file[0] == sha.new(timestamp).digest() + + +class CompleteChunkFile(BlockFile): + """ + Reads chunks from a fully-downloaded file. + + A chunk C{i} is created from block C{i}. Block C{i} is unencoded + data read from the file by the L{BlockFile}. Chunk C{i} is + an encoded string created from block C{i}. + + Chunks can be read using list subscripting. The total number of + chunks (equals the total number of blocks) is given by L{__len__}. + + @ivar file_name: Full path to file. + @ivar file_size: Size of file in bytes. + @ivar file_hash: Hash of file. + @ivar meta_name: Full path to metafile, or C{None}. + @ivar tree: L{HashTree} or L{MetaFile} instance for the file. + One can extract a hash from any node in the hash + tree. + """ + + def __init__(self, file_name, meta_name=None, callback=None): + """ + Initialize reader on the given file name. + + The entire file will be read and the hash will be computed from + the file. This may take a long time, so C{callback()} is called + frequently during this process. This allows you to reduce CPU + usage if you wish. + + The C{meta_name} argument is optional. If it is specified, then the + hashes for C{file_name} will be stored under the file + C{meta_name}. If a C{CompleteChunkFile} is created on the same + file and metafile in the future, then the hashes will not need to + be recomputed and the constructor will return instantly. The + metafile contains a file and date stamp, so that if the file stored + in C{file_name} is modified, then the hashes will be recomputed. + """ + BlockFile.__init__(self, file_name, 'r', block_size=65536) + + # Whether we need to compute the hash tree + compute_tree = False + + self.meta_name = meta_name + if self.meta_name != None: + self.meta_name = os.path.abspath(self.meta_name) + self.meta = None + if self.meta_name == None: + compute_tree = True + else: + try: + meta = MetaFile(self.meta_name, 'r') + assert meta.check_timestamp(self.file_name) + except (IOError, AssertionError): + compute_tree = True + + # Compute the hash tree if needed. + if compute_tree: + chunk_hashes = [None] * len(self) + for i in xrange(len(self)): + triple = (self.file_size, i, BlockFile.__getitem__(self, i)) + chunk_hashes[i] = hash(bencode.bencode(triple)) + if callback: + callback() + self.tree = HashTree(chunk_hashes) + del chunk_hashes + + # If a meta-file was given, make self.tree be a MetaFile instance. + if self.meta_name != None: + if compute_tree: + # Did we compute the hash tree? Then store it to disk. + self.tree = MetaFile(self.meta_name, 'w', self.tree) + # Update its timestamp to be consistent with the file we + # just hashed. + self.tree.set_timestamp(self.file_name) + else: + # Read existing file from disk. + self.tree = MetaFile(self.meta_name, 'r') + + self.file_hash = self.tree[0] + + def __getitem__(self, i): + """ + Get chunk C{i}. + + Raises C{ValueError} if the file's contents changed since the + CompleteFileChunkReader was instantiated. + """ + return encode_chunk(BlockFile.__getitem__(self, i), i, + self.file_size, self.tree) + + +def encode_chunk(block, index, file_size, tree): + """ + Encode a chunk. + + Given a block at index C{index} in a file with size C{file_size}, + and a L{HashTree} or L{MetaFile} instance C{tree}, computes and + returns a chunk string for the given block. + + The C{tree} argument needs to have correct hashes only at certain + indices. Check out the code for details. In any case, if a hash + is wrong an exception will be raised. + """ + block_count = (len(tree) + 1) // 2 + if index < 0 or index >= block_count: + raise IndexError('block index out of range: ' + repr(index)) + + suffix = bencode.bencode((file_size, index, block)) + current = len(tree) - block_count + index + prefix = [] + while current > 0: + sibling = tree.sibling(current) + prefix += [tree[current], tree[sibling]] + current = tree.parent(current) + prefix = ''.join(prefix) + + # Encode the chunk + chunk = bencode.bencode((prefix, suffix)) + + # Check to make sure it decodes properly. + decode_chunk(chunk, file_size, tree) + return chunk + + +def decode_chunk(chunk, file_size, tree): + """ + Decode a chunk. + + Given file with size C{file_size} and a L{HashTree} or L{MetaFile} + instance C{tree}, return C{(index, block, tree_items)}. Here + C{index} is the block index where string C{block} should be placed + in the file. Also C{tree_items} is a dict mapping indices within + the L{HashTree} or L{MetaFile} tree object associated with the + given file to the corresponding hashes at those indices. These + have been verified against the file's hash, so it is known that + they are correct. + + Raises C{ValueError} if chunk verification fails. + """ + file_hash = tree[0] + block_count = (len(tree) + 1) // 2 + try: + # Decode the chunk + try: + (prefix, suffix) = bencode.bdecode(chunk) + except: + raise AssertionError() + + assert isinstance(prefix, str) + assert isinstance(suffix, str) + + # Verify the suffix against the hashes in the prefix. + hash_len = len(hash('')) + L = [prefix[hash_len*i:hash_len*(i+1)] for i in range(len(prefix)//hash_len)] + L += [file_hash] + assert L[0] == hash(suffix) + branches = [] + for i in range(0, len(L)-1, 2): + if hash(L[i] + L[i+1]) == L[i+2]: + branches += [0] + elif hash(L[i+1] + L[i]) == L[i+2]: + branches += [1] + else: + raise AssertionError() + + # Decode the suffix + try: + (claim_file_size, claim_index, block) = bencode.bdecode(suffix) + except: + raise AssertionError() + + assert isinstance(claim_file_size, int) or isinstance(claim_file_size, long) + assert isinstance(claim_index, int) or isinstance(claim_index, long) + assert isinstance(block, str) + + assert file_size == claim_file_size + + # Compute the index of the block, and check it. + found_index = sum([branches[i]*2**i for i in range(len(branches))]) + assert found_index == claim_index + + # Now fill in the tree_items dict. + tree_items = {} + current = (len(tree) - block_count) + found_index + i = 0 + while current > 0 and i + 1 < len(L): + tree_items[current] = L[i] + # Next item is our sibling. + tree_items[tree.sibling(current)] = L[i+1] + i += 2 + current = tree.parent(current) + + return (found_index, block, tree_items) + except AssertionError: + raise ValueError('corrupt chunk') + + +class PartialChunkFile(BlockFile): + """ + Reads and writes chunks to a partially downloaded file. + + @ivar file_name: Full path to file. + @ivar file_size: Size of file in bytes. + @ivar file_hash: Hash of file. + @ivar meta_name: Full path to metafile. + @ivar tree: L{MetaFile} instance for the file. + The hashes in this hash tree are valid only for + nodes that we have been sent hashes for. + """ + def __init__(self, file_name, meta_name, file_hash=None, file_size=None): + """ + Initialize reader/writer for the given file name and metafile name. + + If neither C{file_name} nor C{meta_file} exist, then both are + created. The C{file_hash} and C{file_size} arguments are used to + initialize the two files. + + If both C{file_name} and C{meta_file} exist, then the hash and + file size arguments are ignored, and those values are instead read + from the files. + + If one file exists and the other does not, an C{IOError} is raised. + """ + self.meta_name = os.path.abspath(meta_name) + meta_exists = os.path.exists(self.meta_name) + file_exists = os.path.exists(os.path.abspath(file_name)) + + BlockFile.__init__(self, os.path.abspath(file_name), 'w', + BLOCK_SIZE, file_size) + + if file_exists and not meta_exists: + raise IOError('metafile ' + repr(self.meta_name) + + ' missing for file ' + repr(self.file_name)) + if meta_exists and not file_exists: + raise IOError('file ' + repr(self.file_name) + + ' missing for metafile ' + repr(self.meta_name)) + tree_count = 2 * roundup_pow2(len(self)) - 1 + self.tree = MetaFile(self.meta_name, 'w', [hash('')] * tree_count) + + if not meta_exists and not file_exists: + self.tree[0] = file_hash + + self.file_hash = self.tree[0] + + def __getitem__(self, i): + """ + Get chunk C{i}. + + Raises C{ValueError} if chunk has not yet been downloaded or is + corrupted. + """ + return encode_chunk(BlockFile.__getitem__(self, i), i, + self.file_size, self.tree) + + def __setitem__(self, i, chunk): + """ + Set chunk C{i}. + + Raises C{ValueError} if the chunk is invalid. + """ + (index, block, tree_items) = decode_chunk(chunk, + self.file_size, self.tree) + if index != i: + raise ValueError('incorrect index for chunk') + BlockFile.__setitem__(self, index, block) + for (tree_index, tree_value) in tree_items.items(): + self.tree[tree_index] = tree_value + + +def test(filename1='temp-out', metaname1='temp-out.meta', + filename2='temp-out2', metaname2='temp-out2.meta'): + """ + Unit tests. + """ + print 'Testing:' + + import random + ntests = 100 + max_file_size = 200000 + + # Test CompleteChunkFile. + + if os.path.exists(metaname1): + os.remove(metaname1) + + for i in range(ntests): + fsize = random.randrange(max_file_size) + # Make some random string of size 'fsize' to go in the file. + s = ''.join([sha.new(str(j)).digest() for j in range(fsize//20+1)]) + assert len(s) >= fsize + s = s[:fsize] + f = open(filename1, 'wb') + f.write(s) + f.close() + C = CompleteChunkFile(filename1) + for j in range(len(C)): + C[j] + C = CompleteChunkFile(filename1, metaname1) + for j in range(len(C)): + C[j] + C = CompleteChunkFile(filename1, metaname1) + for j in range(len(C)): + C[j] + os.remove(metaname1) + + os.remove(filename1) + + print ' CompleteChunkFile: OK' + + # Test PartialChunkFile + + for i in range(ntests): + fsize = random.randrange(max_file_size) + # Make some random string of size 'fsize' to go in the file. + s = ''.join([sha.new(str(j)).digest() for j in range(fsize//20+1)]) + assert len(s) >= fsize + s = s[:fsize] + f = open(filename1, 'wb') + f.write(s) + f.close() + C1 = CompleteChunkFile(filename1) + if os.path.exists(filename2): + os.remove(filename2) + + if os.path.exists(metaname2): + os.remove(metaname2) + C2 = PartialChunkFile(filename2, metaname2, C1.file_hash, C1.file_size) + assert len(C1) == len(C2) + assert C2.tree[0] == C1.tree[0] + for j in range(len(C2)): + try: + C2[j] + ok = False + except ValueError: + ok = True + if not ok: + raise AssertionError() + for j in range(len(C2)//2): + k = random.randrange(len(C2)) + if len(C1) > 1: + assert C1[k] != C1[(k+1)%len(C1)] + try: + C2[k] = C1[(k+1)%len(C1)] + ok = False + except ValueError: + ok = True + if not ok: + raise AssertionError() + C2[k] = C1[k] + assert C2[k] == C1[k] + for j in range(len(C2)): + C2[j] = C1[j] + assert C2[j] == C1[j] + + os.remove(filename1) + os.remove(filename2) + os.remove(metaname2) + + print ' PartialChunkFile: OK' + + +if __name__ == '__main__': + test() diff --git a/allmydata/encode_new.py b/allmydata/encode_new.py new file mode 100644 index 00000000..a291a829 --- /dev/null +++ b/allmydata/encode_new.py @@ -0,0 +1,204 @@ +#! /usr/bin/python + +import math +from twisted.internet import defer +from allmydata.chunk import HashTree +from Crypto.Cipher import AES +import sha + +def hash(data): + return sha.new(data).digest() + +""" + +The goal of the encoder is to turn the original file into a series of +'shares'. Each share is going to a 'shareholder' (nominally each shareholder +is a different host, but for small meshes there may be overlap). The number +of shares is chosen to hit our reliability goals (more shares on more +machines means more reliability), and is limited by overhead (proportional to +numshares or log(numshares)) and the encoding technology in use (Reed-Solomon +only permits 256 shares total). It is also constrained by the amount of data +we want to send to each host. For estimating purposes, think of 100 shares +out of which we need 25 to reconstruct the file. + +The encoder starts by cutting the original file into segments. All segments +except the last are of equal size. The segment size is chosen to constrain +the memory footprint (which will probably vary between 1x and 4x segment +size) and to constrain the overhead (which will be proportional to either the +number of segments or log(number of segments)). + + +Each segment (A,B,C) is read into memory, encrypted, and encoded into +subshares. The 'share' (say, share #1) that makes it out to a host is a +collection of these subshares (subshare A1, B1, C1), plus some hash-tree +information necessary to validate the data upon retrieval. Only one segment +is handled at a time: all subshares for segment A are delivered before any +work is begun on segment B. + +As subshares are created, we retain the hash of each one. The list of +subshare hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is +used to form the base of a Merkle hash tree for that share (hashtrees[1]). +This hash tree has one terminal leaf per subshare. The complete subshare hash +tree is sent to the shareholder after all the data has been sent. At +retrieval time, the decoder will ask for specific pieces of this tree before +asking for subshares, whichever it needs to validate those subshares. + +[TODO: we don't really need to generate this whole subshare hash tree +ourselves. It would be sufficient to have the shareholder generate it and +just tell us the root. This gives us an extra level of validation on the +transfer, though, and it is relatively cheap to compute.] + +Each of these subshare hash trees has a root hash. The collection of these +root hashes for all shares are collected into the 'share hash tree', which +has one terminal leaf per share. After sending the subshares and the complete +subshare hash tree to each shareholder, we send them the portion of the share +hash tree that is necessary to validate their share. The root of the share +hash tree is put into the URI. + +""" + + + + +class Encoder(object): + + def setup(self, infile): + self.infile = infile + infile.seek(0, 2) + self.file_size = infile.tell() + infile.seek(0, 0) + fsize = 1.0 * self.file_size + self.segment_size = 1024 + self.num_segments = int(math.ceil(fsize / self.segment_size)) + + self.num_shares = 100 + self.share_size = self.file_size / 25 + + def get_reservation_size(self): + self.num_shares = 100 + self.share_size = self.file_size / 25 + overhead = self.compute_overhead() + return self.share_size + overhead + + def setup_encryption(self): + self.key = "\x00"*16 + self.cryptor = AES.new(key=self.key, mode=AES.MODE_CTR, + counterstart="\x00"*16) + self.segment_num = 0 + self.subshare_hashes = [[]] * self.num_shares + # subshare_hashes[i] is a list that will be accumulated and then send + # to landlord[i]. This list contains a hash of each segment_share + # that we sent to that landlord. + self.share_root_hashes = [None] * self.num_shares + + def start(self): + self.setup_encryption() + d = defer.succeed(None) + for i in range(self.num_segments): + d.addCallback(lambda res: self.do_segment(i)) + d.addCallback(lambda res: self.send_all_subshare_hash_trees()) + d.addCallback(lambda res: self.send_all_share_hash_trees()) + d.addCallback(lambda res: self.close_all_shareholders()) + d.addCallback(lambda res: self.done()) + return d + + def encode_segment(self, crypttext): + shares = [crypttext] * self.num_shares + return shares + + def do_segment(self, segnum): + segment_plaintext = self.infile.read(self.segment_size) + segment_crypttext = self.cryptor.encrypt(segment_plaintext) + del segment_plaintext + subshares_for_this_segment = self.encode_segment(segment_crypttext) + del segment_crypttext + dl = [] + for share_num,subshare in enumerate(subshares_for_this_segment): + d = self.send_subshare(share_num, self.segment_num, subshare) + dl.append(d) + self.subshare_hashes[share_num].append(hash(subshare)) + self.segment_num += 1 + return defer.DeferredList(dl) + + def send_subshare(self, share_num, segment_num, subshare): + #if False: + # offset = hash_size + segment_num * segment_size + # return self.send(share_num, "write", subshare, offset) + return self.send(share_num, "put_subshare", segment_num, subshare) + + def send(self, share_num, methname, *args, **kwargs): + ll = self.landlords[share_num] + return ll.callRemote(methname, *args, **kwargs) + + def send_all_subshare_hash_trees(self): + dl = [] + for share_num,hashes in enumerate(self.subshare_hashes): + # hashes is a list of the hashes of all subshares that were sent + # to shareholder[share_num]. + dl.append(self.send_one_subshare_hash_tree(share_num, hashes)) + return defer.DeferredList(dl) + + def send_one_subshare_hash_tree(self, share_num, subshare_hashes): + t = HashTree(subshare_hashes) + all_hashes = list(t) + # all_hashes[0] is the root hash, == hash(ah[1]+ah[2]) + # all_hashes[1] is the left child, == hash(ah[3]+ah[4]) + # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2]) + self.share_root_hashes[share_num] = t[0] + ll = self.landlords[share_num] + if False: + block = "".join(all_hashes) + return ll.callRemote("write", block, offset=0) + return ll.callRemote("put_subshare_hashes", all_hashes) + + def send_all_share_hash_trees(self): + dl = [] + for h in self.share_root_hashes: + assert h + # create the share hash tree + t = HashTree(self.share_root_hashes) + # the root of this hash tree goes into our URI + self.root_hash = t[0] + # now send just the necessary pieces out to each shareholder + for i in range(self.num_shares): + needed_hash_indices = t.needed_for(i) + dl.append(self.send_one_share_hash_tree(i, needed_hash_indices)) + return defer.DeferredList(dl) + + def send_one_share_hash_tree(self, share_num, needed_hashes): + ll = self.landlords[share_num] + return ll.callRemote("put_share_hashes", needed_hashes) + + def close_all_shareholders(self): + dl = [] + for ll in self.landlords: + dl.append(ll.callRemote("close")) + return defer.DeferredList(dl) + + def done(self): + return self.root_hash + + +from foolscap import RemoteInterface +from foolscap.schema import ListOf, TupleOf, Nothing +_None = Nothing() + + +class RIStorageBucketWriter(RemoteInterface): + def put_subshare(segment_number=int, subshare=str): + return _None + def put_segment_hashes(all_hashes=ListOf(str)): + return _None + def put_share_hashes(needed_hashes=ListOf(TupleOf(int,str))): + return _None + #def write(data=str, offset=int): + # return _None +class RIStorageBucketReader(RemoteInterface): + def get_share_hashes(): + return ListOf(TupleOf(int,str)) + def get_segment_hashes(which=ListOf(int)): + return ListOf(str) + def get_subshare(segment_number=int): + return str + #def read(size=int, offset=int): + # return str diff --git a/allmydata/test/test_encode.py b/allmydata/test/test_encode.py new file mode 100644 index 00000000..9e8b42c3 --- /dev/null +++ b/allmydata/test/test_encode.py @@ -0,0 +1,19 @@ +#! /usr/bin/python + +from twisted.trial import unittest +from twisted.internet import defer +from allmydata import encode_new +from cStringIO import StringIO + +class MyEncoder(encode_new.Encoder): + def send(self, share_num, methname, *args, **kwargs): + return defer.succeed(None) + +class Encode(unittest.TestCase): + def OFFtest_1(self): + e = MyEncoder() + data = StringIO("some data to encode\n") + e.setup(data) + d = e.start() + return d + diff --git a/allmydata/util/bencode.py b/allmydata/util/bencode.py new file mode 100644 index 00000000..d507a6ce --- /dev/null +++ b/allmydata/util/bencode.py @@ -0,0 +1,433 @@ +#!/usr/bin/env python +# -*- coding: MacRoman -*- +""" +A library for streaming and unstreaming of simple objects, designed +for speed, compactness, and ease of implementation. + +The basic functions are bencode and bdecode. bencode takes an object +and returns a string, bdecode takes a string and returns an object. +bdecode raises a ValueError if you give it an invalid string. + +The objects passed in may be nested dicts, lists, ints, floats, strings, +and Python boolean and None types. For example, all of the following +may be bencoded - + +{'a': [0, 1], 'b': None} + +[None, ['a', 2, ['c', None]]] + +{'spam': (2,3,4)} + +{'name': 'Cronus', 'spouse': 'Rhea', 'children': ['Hades', 'Poseidon']} + +In general bdecode(bencode(spam)) == spam, but tuples and lists are +encoded the same, so bdecode(bencode((0, 1))) is [0, 1] rather +than (0, 1). Longs and ints are also encoded the same way, so +bdecode(bencode(4)) is a long. + +Dict keys are required to be basestrings (byte strings or unicode objects), +to avoid a mess of potential implementation incompatibilities. bencode is +intended to be used for protocols which are going to be re-implemented many +times, so it's very conservative in that regard. + +Which type is encoded is determined by the first character, 'i', 'n', 'f', +'d', 'l', 'b', 'u', and any digit. They indicate integer, null, float, +dict, list, boolean, unicode string, and string, respectively. + +Strings are length-prefixed in base 10, followed by a colon. + +bencode('spam') == '4:spam' + +Unicode string objects are indicated with an initial u, a base 10 +length-prefix, and the remaining bytes in utf-8 encoding. + +bencode(u'\u00bfHabla espa\u00f1ol?') == 'ËHabla espaÐol?' + +Nulls are indicated by a single 'n'. + +bencode(None) == 'n' + +Integers are encoded base 10 and terminated with an 'e' - + +bencode(3) == 'i3e' +bencode(-20) == 'i-20e' + +Floats are encoded in base 10 and terminated with an 'e' - + +bencode(3.2) == 'f3.2e' +bencode(-23.4532) == 'f-23.4532e' + +Lists are encoded in list order, terminated by an 'e' - + +bencode(['abc', 'd']) == 'l3:abc1:de' +bencode([2, 'f']) == 'li2e1:fe' + +Dicts are encoded by containing alternating keys and values. +The keys are encoded in sorted order, but sort order is not +enforced on the decode. Dicts are terminated by an 'e'. Dict +keys can be either bytestrings or unicode strings. For example - + +bencode({'spam': 'eggs'}) == 'd4:spam4:eggse' +bencode({'ab': 2, 'a': None}) == 'd1:an2:abi2ee' +bencode({'a' : 1, u'\xab': 2}) == 'd1:ai1eu4:\xfe\xff\x00\xa8i2ee' + +Truncated strings come first, so in sort order 'a' comes before 'abc'. +""" + +# This file is licensed under the GNU Lesser General Public License v2.1. +# +# Originally written by Mojo Nation. +# Rewritten by Bram Cohen. +# Further enhanced by Allmydata to support additional Python types (Boolean +# None, Float, and Unicode strings.) + +from types import IntType, LongType, FloatType, ListType, TupleType, DictType, StringType, UnicodeType, BooleanType, NoneType +from cStringIO import StringIO +import string + +def bencode(data): + """ + encodes objects as strings, see module documentation for more info + """ + result = StringIO() + bwrite(data, result) + return result.getvalue() + +def bwrite(data, result): + # a generic using pje's type dispatch will be faster here + try: + encoder = encoders[type(data)] + except KeyError: + encoder = None + # Catch subclasses of built-in types + for t,coder in encoders.items(): + if isinstance(data, t): + encoder = coder + break + if not encoder: + raise ValueError("unsupported data type: %s" % type(data)) + encoder(data, result) + +encoders = {} + +def encode_int(data, result): + result.write('i' + str(data) + 'e') + +encoders[IntType] = encode_int +encoders[LongType] = encode_int + +def encode_float(data, result): + result.write('f' + str(data) + 'e') + +encoders[FloatType] = encode_float + +def encode_bool(data, result): + if data: + result.write('b1') + else: + result.write('b0') + +encoders[BooleanType] = encode_bool + +def encode_list(data, result): + result.write('l') + _bwrite = bwrite + for item in data: + _bwrite(item, result) + result.write('e') + +encoders[TupleType] = encode_list +encoders[ListType] = encode_list +encoders[set] = encode_list + +def encode_string(data, result): + result.write(str(len(data)) + ':' + data) + +encoders[StringType] = encode_string + +def encode_unicode(data, result): + payload = data.encode('utf-8') + result.write('u' + str(len(payload)) + ':' + payload) + +encoders[UnicodeType] = encode_unicode + +def encode_dict(data, result): + result.write('d') + _bwrite = bwrite + keylist = data.keys() + keylist.sort() + for key in keylist: + _bwrite(key, result) + _bwrite(data[key], result) + result.write('e') + +encoders[DictType] = encode_dict + +encoders[NoneType] = lambda data, result: result.write('n') + +def bdecode(s): + """ + Does the opposite of bencode. Raises a ValueError if there's a problem. + """ + try: + result, index = bread(s, 0) + if index != len(s): + raise ValueError('left over stuff at end: %s' % s[index:]) + return result + except IndexError, e: + raise ValueError(str(e)) + except KeyError, e: + raise ValueError(str(e)) + +def bread(s, index): + return decoders[s[index]](s, index) + +decoders = {} + +def decode_raw_string(s, index): + ci = s.index(":", index) + ei = ci + int(s[index:ci]) + 1 + if ei > len(s): + raise ValueError('length encoding indicates premature end of string') + return (s[ci+1:ei], ei) + +for c in string.digits: + decoders[c] = decode_raw_string + +def decode_unicode_string(s, index): + ci = s.index(":", index) + ei = ci + int(s[index+1:ci]) + 1 + if ei > len(s): + raise ValueError('length encoding indicates premature end of string') + return (unicode(s[ci+1:ei], 'utf-8'), ei) + +decoders['u'] = decode_unicode_string + +def decode_int(s, index): + ei = s.index('e', index) + return (long(s[index+1:ei]), ei+1) + +decoders['i'] = decode_int + +def decode_float(s, index): + ei = s.index('e', index) + return (float(s[index+1:ei]), ei+1) + +decoders['f'] = decode_float + +def decode_bool(s, index): + val = s[index+1] + if val == '1': + return True, index+2 + elif val == '0': + return False, index+2 + else: + raise ValueError('invalid boolean encoding: %s' % s[index:index+2]) + +decoders['b'] = decode_bool + +# decoders['n'] = lambda s, index: decoders_n.inc('n') or (None, index + 1) +decoders['n'] = lambda s, index: (None, index + 1) + +def decode_list(s, index): + # decoders_n.inc('l') + result = [] + index += 1 + _bread = bread + while s[index] != 'e': + next, index = _bread(s, index) + result.append(next) + return result, index + 1 + +decoders['l'] = decode_list + +def decode_dict(s, index): + # decoders_n.inc('d') + result = {} + index += 1 + _decode_string = decode_raw_string + _decode_unicode = decode_unicode_string + _bread = bread + while s[index] != 'e': + if s[index] in string.digits: + key, index = _decode_string(s, index) + elif s[index] == "u": + key, index = _decode_unicode(s, index) + else: + raise ValueError("dict key must be basestring") + if key in result: + raise ValueError("dict key was repeated") + value, index = _bread(s, index) + result[key] = value + return result, index + 1 + +decoders['d'] = decode_dict + +def test_decode_raw_string(): + assert decode_raw_string('1:a', 0) == ('a', 3) + assert decode_raw_string('0:', 0) == ('', 2) + assert decode_raw_string('10:aaaaaaaaaaaaaaaaaaaaaaaaa', 0) == ('aaaaaaaaaa', 13) + assert decode_raw_string('10:', 1) == ('', 3) +# non-reexp version does not check for this case +# try: +# decode_raw_string('01:a', 0) +# assert 0, 'failed' +# except ValueError: +# pass + try: + decode_raw_string('--1:a', 0) + assert 0, 'failed' + except ValueError: + pass + try: + decode_raw_string('h', 0) + assert 0, 'failed' + except ValueError: + pass + try: + decode_raw_string('h:', 0) + assert 0, 'failed' + except ValueError: + pass + try: + decode_raw_string('1', 0) + assert 0, 'failed' + except ValueError: + pass + try: + decode_raw_string('', 0) + assert 0, 'failed' + except ValueError: + pass + try: + decode_raw_string('5:a', 0) + assert 0, 'failed' + except ValueError: + pass + +def test_encode_and_decode_unicode_results_in_unicode_type(): + assert bdecode(bencode(u'\u00bfHabla espa\u00f1ol?')) == u'\u00bfHabla espa\u00f1ol?' + +def test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type(): + test_string = bdecode(bencode(u'\u00bfHabla espa\u00f1ol?')) + if isinstance(test_string, unicode): + assert test_string == u'\u00bfHabla espa\u00f1ol?' + elif isinstance(test_string, str): + assert test_string.decode('utf-8') == u'\u00bfHabla espa\u00f1ol?' + else: + assert 0, 'flunked' + +def test_dict_forbids_non_string_key(): + try: + bdecode('di3ene') + assert 0, 'failed' + except ValueError: + pass + +def test_dict_forbids_key_repeat(): + try: + bdecode('d1:an1:ane') + assert 0, 'failed' + except ValueError: + pass + +def test_empty_dict(): + assert bdecode('de') == {} + +def test_dict_allows_unicode_keys(): + assert bdecode(bencode({'a': 1, u'\xa8': 2})) == {'a': 1L, u'\xa8': 2L} + +def test_ValueError_in_decode_unknown(): + try: + bdecode('x') + assert 0, 'flunked' + except ValueError: + pass + +def test_encode_and_decode_none(): + assert bdecode(bencode(None)) == None + +def test_encode_and_decode_long(): + assert bdecode(bencode(-23452422452342L)) == -23452422452342L + +def test_encode_and_decode_int(): + assert bdecode(bencode(2)) == 2 + +def test_encode_and_decode_float(): + assert bdecode(bencode(3.4)) == 3.4 + assert bdecode(bencode(0.0)) == 0.0 + assert bdecode(bencode(-4.56)) == -4.56 + assert bdecode(bencode(-0.0)) == -0.0 + +def test_encode_and_decode_bool(): + assert bdecode(bencode(True)) == True + assert bdecode(bencode(False)) == False + +# the non-regexp methods no longer check for canonical ints, but we +# don't parse input we did not generate using bencode, so I will leave +# these commented out for now +#def test_decode_noncanonical_int(): +# try: +# bdecode('i03e') +# assert 0 +# except ValueError: +# pass +# try: +# bdecode('i3 e') +# assert 0 +# except ValueError: +# pass +# try: +# bdecode('i 3e') +# assert 0 +# except ValueError: +# pass +# try: +# bdecode('i-0e') +# assert 0 +# except ValueError: +# pass + +def test_encode_and_decode_dict(): + x = {'42': 3} + assert bdecode(bencode(x)) == x + +def test_encode_and_decode_list(): + assert bdecode(bencode([])) == [] + +def test_encode_and_decode_tuple(): + assert bdecode(bencode(())) == [] + +def test_encode_and_decode_empty_dict(): + assert bdecode(bencode({})) == {} + +def test_encode_and_decode_complex_object(): + spam = [[], 0, -3, -345234523543245234523L, {}, 'spam', None, {'a': [3]}, {}, {'a': 1L, u'\xa8': 2L}] + assert bencode(bdecode(bencode(spam))) == bencode(spam) + assert bdecode(bencode(spam)) == spam + +def test_unfinished_list(): + try: + bdecode('ln') + assert 0 + except ValueError: + pass + +def test_unfinished_dict(): + try: + bdecode('d') + assert 0 + except ValueError: + pass + try: + bdecode('d1:a') + assert 0 + except ValueError: + pass + +def test_unsupported_type(): + try: + bencode(lambda: None) + assert 0 + except ValueError: + pass \ No newline at end of file