from twisted.python import log
from allmydata import uri
from allmydata.hashtree import HashTree
-from allmydata.Crypto.Cipher import AES
from allmydata.util import mathutil, hashutil
from allmydata.util.assertutil import _assert
from allmydata.codec import CRSEncoder
-from allmydata.interfaces import IEncoder, IStorageBucketWriter
+from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
+ IEncryptedUploadable
"""
self.SHARES_OF_HAPPINESS = happy
self.TOTAL_SHARES = n
self.uri_extension_data = {}
+ self._codec = None
def set_size(self, size):
+ assert not self._codec
self.file_size = size
def set_params(self, encoding_parameters):
+ assert not self._codec
k,d,n = encoding_parameters
self.NEEDED_SHARES = k
self.SHARES_OF_HAPPINESS = d
self.TOTAL_SHARES = n
- def set_uploadable(self, uploadable):
- self._uploadable = uploadable
-
- def setup(self):
-
+ def _setup_codec(self):
self.num_shares = self.TOTAL_SHARES
self.required_shares = self.NEEDED_SHARES
self.shares_of_happiness = self.SHARES_OF_HAPPINESS
# this must be a multiple of self.required_shares
self.segment_size = mathutil.next_multiple(self.segment_size,
self.required_shares)
- self._setup_codec()
- def _setup_codec(self):
+ # now set up the codec
+
assert self.segment_size % self.required_shares == 0
self.num_segments = mathutil.div_ceil(self.file_size,
self.segment_size)
self.required_shares, self.num_shares)
data['tail_codec_params'] = self._tail_codec.get_serialized_params()
- def get_serialized_params(self):
- return self._codec.get_serialized_params()
-
- def set_encryption_key(self, key):
- assert isinstance(key, str)
- assert len(key) == 16 # AES-128
- self.key = key
-
- def get_share_size(self):
+ def _get_share_size(self):
share_size = mathutil.div_ceil(self.file_size, self.required_shares)
- overhead = self.compute_overhead()
+ overhead = self._compute_overhead()
return share_size + overhead
- def compute_overhead(self):
+
+ def _compute_overhead(self):
return 0
- def get_block_size(self):
- return self._codec.get_block_size()
- def get_num_segments(self):
- return self.num_segments
+
+ def set_encrypted_uploadable(self, uploadable):
+ u = self._uploadable = IEncryptedUploadable(uploadable)
+ d = u.get_size()
+ d.addCallback(self.set_size)
+ d.addCallback(lambda res: self.get_param("serialized_params"))
+ d.addCallback(u.set_serialized_encoding_parameters)
+ d.addCallback(lambda res: u.get_storage_index())
+ def _done(storage_index):
+ self._storage_index = storage_index
+ return self
+ d.addCallback(_done)
+ return d
+
+ def get_param(self, name):
+ if not self._codec:
+ self._setup_codec()
+
+ if name == "storage_index":
+ return self._storage_index
+ elif name == "share_counts":
+ return (self.required_shares, self.shares_of_happiness,
+ self.num_shares)
+ elif name == "num_segments":
+ return self.num_segments
+ elif name == "segment_size":
+ return self.segment_size
+ elif name == "block_size":
+ return self._codec.get_block_size()
+ elif name == "share_size":
+ return self._get_share_size()
+ elif name == "serialized_params":
+ return self._codec.get_serialized_params()
+ else:
+ raise KeyError("unknown parameter name '%s'" % name)
def set_shareholders(self, landlords):
assert isinstance(landlords, dict)
def start(self):
#paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
- self._plaintext_hasher = hashutil.plaintext_hasher()
- self._plaintext_hashes = []
+ if not self._codec:
+ self._setup_codec()
+
self._crypttext_hasher = hashutil.crypttext_hasher()
self._crypttext_hashes = []
- self.setup_encryption()
- d = defer.succeed(None)
+ self.segment_num = 0
+ self.subshare_hashes = [[] for x in range(self.num_shares)]
+ # subshare_hashes[i] is a list that will be accumulated and then send
+ # to landlord[i]. This list contains a hash of each segment_share
+ # that we sent to that landlord.
+ self.share_root_hashes = [None] * self.num_shares
+
+ d = defer.maybeDeferred(self._uploadable.set_segment_size,
+ self.segment_size)
for l in self.landlords.values():
d.addCallback(lambda res, l=l: l.start())
d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
d.addCallback(self._send_segment, last_segnum)
- d.addCallback(lambda res: self.finish_flat_hashes())
+ d.addCallback(lambda res: self.finish_hashing())
d.addCallback(lambda res:
self.send_plaintext_hash_tree_to_all_shareholders())
d.addCallbacks(lambda res: self.done(), self.err)
return d
- def setup_encryption(self):
- self.cryptor = AES.new(key=self.key, mode=AES.MODE_CTR,
- counterstart="\x00"*16)
- self.segment_num = 0
- self.subshare_hashes = [[] for x in range(self.num_shares)]
- # subshare_hashes[i] is a list that will be accumulated and then send
- # to landlord[i]. This list contains a hash of each segment_share
- # that we sent to that landlord.
- self.share_root_hashes = [None] * self.num_shares
-
def _encode_segment(self, segnum):
codec = self._codec
# of additional shares which can be substituted if the primary ones
# are unavailable
- plaintext_segment_hasher = hashutil.plaintext_segment_hasher()
crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
# memory footprint: we only hold a tiny piece of the plaintext at any
# footprint to 500KiB at the expense of more hash-tree overhead.
d = self._gather_data(self.required_shares, input_piece_size,
- plaintext_segment_hasher,
crypttext_segment_hasher)
def _done(chunks):
for c in chunks:
assert len(c) == input_piece_size
- self._plaintext_hashes.append(plaintext_segment_hasher.digest())
self._crypttext_hashes.append(crypttext_segment_hasher.digest())
# during this call, we hit 5*segsize memory
return codec.encode(chunks)
codec = self._tail_codec
input_piece_size = codec.get_block_size()
- plaintext_segment_hasher = hashutil.plaintext_segment_hasher()
crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
d = self._gather_data(self.required_shares, input_piece_size,
- plaintext_segment_hasher,
crypttext_segment_hasher,
allow_short=True)
def _done(chunks):
# a short trailing chunk will have been padded by
# _gather_data
assert len(c) == input_piece_size
- self._plaintext_hashes.append(plaintext_segment_hasher.digest())
self._crypttext_hashes.append(crypttext_segment_hasher.digest())
return codec.encode(chunks)
d.addCallback(_done)
return d
def _gather_data(self, num_chunks, input_chunk_size,
- plaintext_segment_hasher, crypttext_segment_hasher,
+ crypttext_segment_hasher,
allow_short=False,
previous_chunks=[]):
"""Return a Deferred that will fire when the required number of
if not num_chunks:
return defer.succeed(previous_chunks)
- d = self._uploadable.read(input_chunk_size)
+ d = self._uploadable.read_encrypted(input_chunk_size)
def _got(data):
encrypted_pieces = []
length = 0
- # we use data.pop(0) instead of 'for input_piece in data' to save
- # memory: each piece is destroyed as soon as we're done with it.
while data:
- input_piece = data.pop(0)
- length += len(input_piece)
- plaintext_segment_hasher.update(input_piece)
- self._plaintext_hasher.update(input_piece)
- encrypted_piece = self.cryptor.encrypt(input_piece)
- assert len(encrypted_piece) == len(input_piece)
+ encrypted_piece = data.pop(0)
+ length += len(encrypted_piece)
crypttext_segment_hasher.update(encrypted_piece)
self._crypttext_hasher.update(encrypted_piece)
encrypted_pieces.append(encrypted_piece)
d.addCallback(_got)
d.addCallback(lambda chunks:
self._gather_data(num_chunks-1, input_chunk_size,
- plaintext_segment_hasher,
crypttext_segment_hasher,
allow_short, chunks))
return d
d0.addErrback(_eatNotEnoughPeersError)
return d
- def finish_flat_hashes(self):
- plaintext_hash = self._plaintext_hasher.digest()
+ def finish_hashing(self):
crypttext_hash = self._crypttext_hasher.digest()
- self.uri_extension_data["plaintext_hash"] = plaintext_hash
self.uri_extension_data["crypttext_hash"] = crypttext_hash
+ u = self._uploadable
+ d = u.get_plaintext_hash()
+ def _got(plaintext_hash):
+ self.uri_extension_data["plaintext_hash"] = plaintext_hash
+ return u.get_plaintext_segment_hashtree_nodes(self.num_segments)
+ d.addCallback(_got)
+ def _got_hashtree_nodes(t):
+ self.uri_extension_data["plaintext_root_hash"] = t[0]
+ self._plaintext_hashtree_nodes = t
+ d.addCallback(_got_hashtree_nodes)
+ return d
def send_plaintext_hash_tree_to_all_shareholders(self):
log.msg("%s sending plaintext hash tree" % self)
- t = HashTree(self._plaintext_hashes)
- all_hashes = list(t)
- self.uri_extension_data["plaintext_root_hash"] = t[0]
dl = []
for shareid in self.landlords.keys():
- dl.append(self.send_plaintext_hash_tree(shareid, all_hashes))
+ d = self.send_plaintext_hash_tree(shareid,
+ self._plaintext_hashtree_nodes)
+ dl.append(d)
return self._gather_responses(dl)
def send_plaintext_hash_tree(self, shareid, all_hashes):
def done(self):
log.msg("%s: upload done" % self)
- return self.uri_extension_hash
+ return (self.uri_extension_hash, self.required_shares,
+ self.num_shares, self.file_size)
def err(self, f):
log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL
"""
class IEncoder(Interface):
- """I take a file-like object that provides a sequence of bytes and a list
- of shareholders, then encrypt, encode, hash, and deliver shares to those
- shareholders. I will compute all the necessary Merkle hash trees that are
- necessary to validate the data that eventually comes back from the
- shareholders. I provide the root hash of the hash tree, and the encoding
- parameters, both of which must be included in the URI.
+ """I take an object that provides IEncryptedUploadable, which provides
+ encrypted data, and a list of shareholders. I then encode, hash, and
+ deliver shares to those shareholders. I will compute all the necessary
+ Merkle hash trees that are necessary to validate the crypttext that
+ eventually comes back from the shareholders. I provide the URI Extension
+ Block Hash, and the encoding parameters, both of which must be included
+ in the URI.
I do not choose shareholders, that is left to the IUploader. I must be
given a dict of RemoteReferences to storage buckets that are ready and
willing to receive data.
"""
- def setup(infile):
- """I take a file-like object (providing seek, tell, and read) from
- which all the plaintext data that is to be uploaded can be read. I
- will seek to the beginning of the file before reading any data.
- setup() must be called before making any other calls, in particular
- before calling get_reservation_size().
+ def set_size(size):
+ """Specify the number of bytes that will be encoded. This must be
+ peformed before get_serialized_params() can be called.
"""
+ def set_params(params):
+ """Override the default encoding parameters. 'params' is a tuple of
+ (k,d,n), where 'k' is the number of required shares, 'd' is the
+ shares_of_happiness, and 'n' is the total number of shares that will
+ be created.
- def get_share_size():
- """I return the size of the data that will be stored on each
- shareholder. This is aggregate amount of data that will be sent to
- the shareholder, summed over all the put_block() calls I will ever
- make.
+ Encoding parameters can be set in three ways. 1: The Encoder class
+ provides defaults (25/75/100). 2: the Encoder can be constructed with
+ an 'options' dictionary, in which the
+ needed_and_happy_and_total_shares' key can be a (k,d,n) tuple. 3:
+ set_params((k,d,n)) can be called.
- TODO: this might also include some amount of overhead, like the size
- of all the hashes. We need to decide whether this is useful or not.
+ If you intend to use set_params(), you must call it before
+ get_share_size or get_param are called.
+ """
+
+ def set_encrypted_uploadable(u):
+ """Provide a source of encrypted upload data. 'u' must implement
+ IEncryptedUploadable.
+
+ When this is called, the IEncryptedUploadable will be queried for its
+ length and the storage_index that should be used.
- It is useful to determine this size before asking potential
- shareholders whether they will grant a lease or not, since their
- answers will depend upon how much space we need.
+ This returns a Deferred that fires with this Encoder instance.
+
+ This must be performed before start() can be called.
"""
- def get_block_size(): # TODO: can we avoid exposing this?
- """I return the size of the individual blocks that will be delivered
- to a shareholder's put_block() method. By knowing this, the
- shareholder will be able to keep all blocks in a single file and
- still provide random access when reading them.
+ def get_param(name):
+ """Return an encoding parameter, by name.
+
+ 'storage_index': return a string with the (16-byte truncated SHA-256
+ hash) storage index to which these shares should be
+ pushed.
+
+ 'share_counts': return a tuple describing how many shares are used:
+ (needed_shares, shares_of_happiness, total_shares)
+
+ 'num_segments': return an int with the number of segments that
+ will be encoded.
+
+ 'segment_size': return an int with the size of each segment.
+
+ 'block_size': return the size of the individual blocks that will
+ be delivered to a shareholder's put_block() method. By
+ knowing this, the shareholder will be able to keep all
+ blocks in a single file and still provide random access
+ when reading them. # TODO: can we avoid exposing this?
+
+ 'share_size': an int with the size of the data that will be stored
+ on each shareholder. This is aggregate amount of data
+ that will be sent to the shareholder, summed over all
+ the put_block() calls I will ever make. It is useful to
+ determine this size before asking potential
+ shareholders whether they will grant a lease or not,
+ since their answers will depend upon how much space we
+ need. TODO: this might also include some amount of
+ overhead, like the size of all the hashes. We need to
+ decide whether this is useful or not.
+
+ 'serialized_params': a string with a concise description of the
+ codec name and its parameters. This may be passed
+ into the IUploadable to let it make sure that
+ the same file encoded with different parameters
+ will result in different storage indexes.
+
+ Once this is called, set_size() and set_params() may not be called.
"""
def set_shareholders(shareholders):
- """I take a dictionary that maps share identifiers (small integers,
- starting at 0) to RemoteReferences that provide RIBucketWriter. This
- must be called before start().
- """
+ """Tell the encoder where to put the encoded shares. 'shareholders'
+ must be a dictionary that maps share number (an integer ranging from
+ 0 to n-1) to an instance that provides IStorageBucketWriter. This
+ must be performed before start() can be called."""
def start():
- """I start the upload. This process involves reading data from the
- input file, encrypting it, encoding the pieces, uploading the shares
+ """Begin the encode/upload process. This involves reading encrypted
+ data from the IEncryptedUploadable, encoding it, uploading the shares
to the shareholders, then sending the hash trees.
- I return a Deferred that fires with the hash of the uri_extension
- data block.
+ set_encrypted_uploadable() and set_shareholders() must be called
+ before this can be invoked.
+
+ This returns a Deferred that fires with a tuple of
+ (uri_extension_hash, needed_shares, total_shares, size) when the
+ upload process is complete. This information, plus the encryption
+ key, is sufficient to construct the URI.
"""
class IDecoder(Interface):
Returns a Deferred that fires (with the results of target.finish)
when the download is finished, or errbacks if something went wrong."""
+class IEncryptedUploadable(Interface):
+ def get_size():
+ """This behaves just like IUploadable.get_size()."""
+
+ def set_serialized_encoding_parameters(serialized_encoding_parameters):
+ """Tell me what encoding parameters will be used for my data.
+
+ 'serialized_encoding_parameters' is a string which indicates how the
+ data will be encoded (codec name, blocksize, number of shares).
+
+ I may use this when get_storage_index() is called, to influence the
+ index that I return. Or, I may just ignore it.
+
+ set_serialized_encoding_parameters() may be called 0 or 1 times. If
+ called, it must be called before get_storage_index().
+ """
+
+ def get_storage_index():
+ """Return a Deferred that fires with a 16-byte storage index. This
+ value may be influenced by the parameters earlier set by
+ set_serialized_encoding_parameters().
+ """
+
+ def set_segment_size(segment_size):
+ """Set the segment size, to allow the IEncryptedUploadable to
+ accurately create the plaintext segment hash tree. This must be
+ called before any calls to read_encrypted."""
+
+ def read_encrypted(length):
+ """This behaves just like IUploadable.read(), but returns crypttext
+ instead of plaintext. set_segment_size() must be called before the
+ first call to read_encrypted()."""
+
+ def get_plaintext_segment_hashtree_nodes(num_segments):
+ """Get the nodes of a merkle hash tree over the plaintext segments.
+
+ This returns a Deferred which fires with a sequence of hashes. Each
+ hash is a node of a merkle hash tree, generally obtained from::
+
+ tuple(HashTree(segment_hashes))
+
+ 'num_segments' is used to assert that the number of segments that the
+ IEncryptedUploadable handled matches the number of segments that the
+ encoder was expecting.
+ """
+
+ def get_plaintext_hash():
+ """Get the hash of the whole plaintext.
+
+ This returns a Deferred which fires with a tagged SHA-256 hash of the
+ whole plaintext, obtained from hashutil.plaintext_hash(data).
+ """
+
+ def close():
+ """Just like IUploadable.close()."""
+
class IUploadable(Interface):
def get_size():
"""Return a Deferred that will fire with the length of the data to be
used, to compute encoding parameters.
"""
- def get_encryption_key(encoding_parameters):
+ def set_serialized_encoding_parameters(serialized_encoding_parameters):
+ """Tell me what encoding parameters will be used for my data.
+
+ 'serialized_encoding_parameters' is a string which indicates how the
+ data will be encoded (codec name, blocksize, number of shares).
+
+ I may use this when get_encryption_key() is called, to influence the
+ key that I return. Or, I may just ignore it.
+
+ set_serialized_encoding_parameters() may be called 0 or 1 times. If
+ called, it must be called before get_encryption_key().
+ """
+
+ def get_encryption_key():
"""Return a Deferred that fires with a 16-byte AES key. This key will
be used to encrypt the data. The key will also be hashed to derive
- the StorageIndex. 'encoding_parameters' is a string which indicates
- how the data will be encoded (codec name, blocksize, number of
- shares): Uploadables may wish to use these parameters while computing
- the encryption key.
+ the StorageIndex.
Uploadables which want to achieve convergence should hash their file
- contents and the encoding_parameters to form the key (which of course
- requires a full pass over the data). Uploadables can use the
- upload.ConvergentUploadMixin class to achieve this automatically.
+ contents and the serialized_encoding_parameters to form the key
+ (which of course requires a full pass over the data). Uploadables can
+ use the upload.ConvergentUploadMixin class to achieve this
+ automatically.
Uploadables which do not care about convergence (or do not wish to
make multiple passes over the data) can simply return a
strongly-random 16 byte string.
+
+ get_encryption_key() may be called multiple times: the IUploadable is
+ required to return the same value each time.
"""
def read(length):
from twisted.python.failure import Failure
from allmydata import encode, upload, download, hashtree, uri
from allmydata.util import hashutil
+from allmydata.util.assertutil import _assert
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
class LostPeerError(Exception):
# force use of multiple segments
options = {"max_segment_size": max_segment_size}
e = encode.Encoder(options)
- e.set_size(datalen)
- e.set_uploadable(upload.Data(data))
- nonkey = "\x00" * 16
- e.set_encryption_key(nonkey)
- e.setup()
- assert e.num_shares == NUM_SHARES # else we'll be completely confused
- assert (NUM_SEGMENTS-1)*e.segment_size < len(data) <= NUM_SEGMENTS*e.segment_size
- shareholders = {}
+ u = upload.Data(data)
+ eu = upload.EncryptAnUploadable(u)
+ d = e.set_encrypted_uploadable(eu)
+
all_shareholders = []
- for shnum in range(NUM_SHARES):
- peer = FakeBucketWriterProxy()
- shareholders[shnum] = peer
- all_shareholders.append(peer)
- e.set_shareholders(shareholders)
- d = e.start()
- def _check(roothash):
- self.failUnless(isinstance(roothash, str))
- self.failUnlessEqual(len(roothash), 32)
+ def _ready(res):
+ k,happy,n = e.get_param("share_counts")
+ _assert(n == NUM_SHARES) # else we'll be completely confused
+ numsegs = e.get_param("num_segments")
+ _assert(numsegs == NUM_SEGMENTS, numsegs, NUM_SEGMENTS)
+ segsize = e.get_param("segment_size")
+ _assert( (NUM_SEGMENTS-1)*segsize < len(data) <= NUM_SEGMENTS*segsize,
+ NUM_SEGMENTS, segsize,
+ (NUM_SEGMENTS-1)*segsize, len(data), NUM_SEGMENTS*segsize)
+
+ shareholders = {}
+ for shnum in range(NUM_SHARES):
+ peer = FakeBucketWriterProxy()
+ shareholders[shnum] = peer
+ all_shareholders.append(peer)
+ e.set_shareholders(shareholders)
+ return e.start()
+ d.addCallback(_ready)
+
+ def _check(res):
+ (uri_extension_hash, required_shares, num_shares, file_size) = res
+ self.failUnless(isinstance(uri_extension_hash, str))
+ self.failUnlessEqual(len(uri_extension_hash), 32)
for i,peer in enumerate(all_shareholders):
self.failUnless(peer.closed)
self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
options = {"max_segment_size": max_segment_size,
"needed_and_happy_and_total_shares": k_and_happy_and_n}
e = encode.Encoder(options)
- e.set_size(len(data))
- e.set_uploadable(upload.Data(data))
- nonkey = "\x00" * 16
- e.set_encryption_key(nonkey)
- e.setup()
- assert e.num_shares == NUM_SHARES # else we'll be completely confused
+ u = upload.Data(data)
+ eu = upload.EncryptAnUploadable(u)
+ d = e.set_encrypted_uploadable(eu)
shareholders = {}
- all_peers = []
- for shnum in range(NUM_SHARES):
- mode = bucket_modes.get(shnum, "good")
- peer = FakeBucketWriterProxy(mode)
- shareholders[shnum] = peer
- e.set_shareholders(shareholders)
-
- d = e.start()
- def _sent(uri_extension_hash):
- return (uri_extension_hash, e, shareholders)
+ def _ready(res):
+ k,happy,n = e.get_param("share_counts")
+ assert n == NUM_SHARES # else we'll be completely confused
+ all_peers = []
+ for shnum in range(NUM_SHARES):
+ mode = bucket_modes.get(shnum, "good")
+ peer = FakeBucketWriterProxy(mode)
+ shareholders[shnum] = peer
+ e.set_shareholders(shareholders)
+ return e.start()
+ d.addCallback(_ready)
+ def _sent(res):
+ d1 = u.get_encryption_key()
+ d1.addCallback(lambda key: (res, key, shareholders))
+ return d1
d.addCallback(_sent)
return d
- def recover(self, (uri_extension_hash, e, shareholders), AVAILABLE_SHARES,
+ def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
recover_mode):
- key = e.key
+ (uri_extension_hash, required_shares, num_shares, file_size) = res
if "corrupt_key" in recover_mode:
# we corrupt the key, so that the decrypted data is corrupted and
# will fail the plaintext hash check. Since we're manually
# attaching shareholders, the fact that the storage index is also
# corrupted doesn't matter.
- key = flip_bit(e.key)
+ key = flip_bit(key)
u = uri.CHKFileURI(key=key,
uri_extension_hash=uri_extension_hash,
- needed_shares=e.required_shares,
- total_shares=e.num_shares,
- size=e.file_size)
+ needed_shares=required_shares,
+ total_shares=num_shares,
+ size=file_size)
URI = u.to_string()
client = None
recover_mode=("corrupt_key"))
def _done(res):
self.failUnless(isinstance(res, Failure))
- self.failUnless(res.check(hashtree.BadHashError))
+ self.failUnless(res.check(hashtree.BadHashError), res)
d.addBoth(_done)
return d
from twisted.application import service
from foolscap import Referenceable
-from allmydata.util import idlib, hashutil
+from allmydata.util import hashutil
from allmydata import encode, storage, hashtree, uri
-from allmydata.interfaces import IUploadable, IUploader
+from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable
+from allmydata.Crypto.Cipher import AES
from cStringIO import StringIO
import collections, random
self.usable_peers.remove(peer)
+class EncryptAnUploadable:
+ """This is a wrapper that takes an IUploadable and provides
+ IEncryptedUploadable."""
+ implements(IEncryptedUploadable)
+
+ def __init__(self, original):
+ self.original = original
+ self._encryptor = None
+ self._plaintext_hasher = hashutil.plaintext_hasher()
+ self._plaintext_segment_hasher = None
+ self._plaintext_segment_hashes = []
+ self._params = None
+
+ def get_size(self):
+ return self.original.get_size()
+
+ def set_serialized_encoding_parameters(self, params):
+ self._params = params
+
+ def _get_encryptor(self):
+ if self._encryptor:
+ return defer.succeed(self._encryptor)
+
+ if self._params is not None:
+ self.original.set_serialized_encoding_parameters(self._params)
+
+ d = self.original.get_encryption_key()
+ def _got(key):
+ e = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
+ self._encryptor = e
+
+ storage_index = hashutil.storage_index_chk_hash(key)
+ assert isinstance(storage_index, str)
+ # There's no point to having the SI be longer than the key, so we
+ # specify that it is truncated to the same 128 bits as the AES key.
+ assert len(storage_index) == 16 # SHA-256 truncated to 128b
+ self._storage_index = storage_index
+
+ return e
+ d.addCallback(_got)
+ return d
+
+ def get_storage_index(self):
+ d = self._get_encryptor()
+ d.addCallback(lambda res: self._storage_index)
+ return d
+
+ def set_segment_size(self, segsize):
+ self._segment_size = segsize
+
+ def _get_segment_hasher(self):
+ p = self._plaintext_segment_hasher
+ if p:
+ left = self._segment_size - self._plaintext_segment_hashed_bytes
+ return p, left
+ p = hashutil.plaintext_segment_hasher()
+ self._plaintext_segment_hasher = p
+ self._plaintext_segment_hashed_bytes = 0
+ return p, self._segment_size
+
+ def _update_segment_hash(self, chunk):
+ offset = 0
+ while offset < len(chunk):
+ p, segment_left = self._get_segment_hasher()
+ chunk_left = len(chunk) - offset
+ this_segment = min(chunk_left, segment_left)
+ p.update(chunk[offset:offset+this_segment])
+ self._plaintext_segment_hashed_bytes += this_segment
+
+ if self._plaintext_segment_hashed_bytes == self._segment_size:
+ # we've filled this segment
+ self._plaintext_segment_hashes.append(p.digest())
+ self._plaintext_segment_hasher = None
+
+ offset += this_segment
+
+ def read_encrypted(self, length):
+ d = self._get_encryptor()
+ d.addCallback(lambda res: self.original.read(length))
+ def _got(data):
+ assert isinstance(data, (tuple, list)), type(data)
+ data = list(data)
+ cryptdata = []
+ # we use data.pop(0) instead of 'for chunk in data' to save
+ # memory: each chunk is destroyed as soon as we're done with it.
+ while data:
+ chunk = data.pop(0)
+ self._plaintext_hasher.update(chunk)
+ self._update_segment_hash(chunk)
+ cryptdata.append(self._encryptor.encrypt(chunk))
+ del chunk
+ return cryptdata
+ d.addCallback(_got)
+ return d
+
+ def get_plaintext_segment_hashtree_nodes(self, num_segments):
+ if len(self._plaintext_segment_hashes) < num_segments:
+ # close out the last one
+ assert len(self._plaintext_segment_hashes) == num_segments-1
+ p, segment_left = self._get_segment_hasher()
+ self._plaintext_segment_hashes.append(p.digest())
+ del self._plaintext_segment_hasher
+ assert len(self._plaintext_segment_hashes) == num_segments
+ ht = hashtree.HashTree(self._plaintext_segment_hashes)
+ return defer.succeed(list(ht))
+
+ def get_plaintext_hash(self):
+ h = self._plaintext_hasher.digest()
+ return defer.succeed(h)
+
+
class CHKUploader:
peer_selector_class = Tahoe3PeerSelector
- def __init__(self, client, uploadable, options={}):
+ def __init__(self, client, options={}):
self._client = client
- self._uploadable = IUploadable(uploadable)
self._options = options
def set_params(self, encoding_parameters):
self._encoding_parameters = encoding_parameters
- needed_shares, shares_of_happiness, total_shares = encoding_parameters
- self.needed_shares = needed_shares
- self.shares_of_happiness = shares_of_happiness
- self.total_shares = total_shares
-
- def start(self):
+ def start(self, uploadable):
"""Start uploading the file.
This method returns a Deferred that will fire with the URI (a
string)."""
- log.msg("starting upload of %s" % self._uploadable)
+ uploadable = IUploadable(uploadable)
+ log.msg("starting upload of %s" % uploadable)
+
+ eu = EncryptAnUploadable(uploadable)
+ d = self.start_encrypted(eu)
+ def _uploaded(res):
+ d1 = uploadable.get_encryption_key()
+ d1.addCallback(lambda key: self._compute_uri(res, key))
+ return d1
+ d.addCallback(_uploaded)
+ return d
- d = self._uploadable.get_size()
- d.addCallback(self.setup_encoder)
- d.addCallback(self._uploadable.get_encryption_key)
- d.addCallback(self.setup_keys)
+ def start_encrypted(self, encrypted):
+ eu = IEncryptedUploadable(encrypted)
+
+ e = encode.Encoder(self._options)
+ e.set_params(self._encoding_parameters)
+ d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders)
- d.addCallback(self.set_shareholders)
- d.addCallback(lambda res: self._encoder.start())
- d.addCallback(self._compute_uri)
+ d.addCallback(self.set_shareholders, e)
+ d.addCallback(lambda res: e.start())
+ # this fires with the uri_extension_hash and other data
return d
- def setup_encoder(self, size):
- self._size = size
- self._encoder = encode.Encoder(self._options)
- self._encoder.set_size(size)
- self._encoder.set_params(self._encoding_parameters)
- self._encoder.set_uploadable(self._uploadable)
- self._encoder.setup()
- return self._encoder.get_serialized_params()
-
- def setup_keys(self, key):
- assert isinstance(key, str)
- assert len(key) == 16 # AES-128
- self._encryption_key = key
- self._encoder.set_encryption_key(key)
- storage_index = hashutil.storage_index_chk_hash(key)
- assert isinstance(storage_index, str)
- # There's no point to having the SI be longer than the key, so we
- # specify that it is truncated to the same 128 bits as the AES key.
- assert len(storage_index) == 16 # SHA-256 truncated to 128b
- self._storage_index = storage_index
- log.msg(" upload storage_index is [%s]" % (idlib.b2a(storage_index,)))
-
-
- def locate_all_shareholders(self, ignored=None):
+ def locate_all_shareholders(self, encoder):
peer_selector = self.peer_selector_class()
- share_size = self._encoder.get_share_size()
- block_size = self._encoder.get_block_size()
- num_segments = self._encoder.get_num_segments()
+
+ storage_index = encoder.get_param("storage_index")
+ share_size = encoder.get_param("share_size")
+ block_size = encoder.get_param("block_size")
+ num_segments = encoder.get_param("num_segments")
+ k,desired,n = encoder.get_param("share_counts")
+
gs = peer_selector.get_shareholders
- d = gs(self._client,
- self._storage_index, share_size, block_size,
- num_segments, self.total_shares, self.shares_of_happiness)
+ d = gs(self._client, storage_index, share_size, block_size,
+ num_segments, n, desired)
return d
- def set_shareholders(self, used_peers):
+ def set_shareholders(self, used_peers, encoder):
"""
@param used_peers: a sequence of PeerTracker objects
"""
for peer in used_peers:
buckets.update(peer.buckets)
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
- self._encoder.set_shareholders(buckets)
+ encoder.set_shareholders(buckets)
- def _compute_uri(self, uri_extension_hash):
- u = uri.CHKFileURI(key=self._encryption_key,
+ def _compute_uri(self, (uri_extension_hash,
+ needed_shares, total_shares, size),
+ key):
+ u = uri.CHKFileURI(key=key,
uri_extension_hash=uri_extension_hash,
- needed_shares=self.needed_shares,
- total_shares=self.total_shares,
- size=self._size,
+ needed_shares=needed_shares,
+ total_shares=total_shares,
+ size=size,
)
- assert u.storage_index == self._storage_index
return u.to_string()
def read_this_many_bytes(uploadable, size, prepend_data=[]):
class LiteralUploader:
- def __init__(self, client, uploadable, options={}):
+ def __init__(self, client, options={}):
self._client = client
- self._uploadable = IUploadable(uploadable)
self._options = options
def set_params(self, encoding_parameters):
pass
- def start(self):
- d = self._uploadable.get_size()
- d.addCallback(lambda size: read_this_many_bytes(self._uploadable, size))
+ def start(self, uploadable):
+ uploadable = IUploadable(uploadable)
+ d = uploadable.get_size()
+ d.addCallback(lambda size: read_this_many_bytes(uploadable, size))
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
d.addCallback(lambda u: u.to_string())
return d
class ConvergentUploadMixin:
# to use this, the class it is mixed in to must have a seekable
# filehandle named self._filehandle
-
- def get_encryption_key(self, encoding_parameters):
- f = self._filehandle
- enckey_hasher = hashutil.key_hasher()
- #enckey_hasher.update(encoding_parameters) # TODO
- f.seek(0)
- BLOCKSIZE = 64*1024
- while True:
- data = f.read(BLOCKSIZE)
- if not data:
- break
- enckey_hasher.update(data)
- enckey = enckey_hasher.digest()[:16]
- f.seek(0)
- return defer.succeed(enckey)
+ _params = None
+ _key = None
+
+ def set_serialized_encoding_parameters(self, params):
+ self._params = params
+ # ignored for now
+
+ def get_encryption_key(self):
+ if self._key is None:
+ f = self._filehandle
+ enckey_hasher = hashutil.key_hasher()
+ #enckey_hasher.update(encoding_parameters) # TODO
+ f.seek(0)
+ BLOCKSIZE = 64*1024
+ while True:
+ data = f.read(BLOCKSIZE)
+ if not data:
+ break
+ enckey_hasher.update(data)
+ f.seek(0)
+ self._key = enckey_hasher.digest()[:16]
+
+ return defer.succeed(self._key)
class NonConvergentUploadMixin:
+ _key = None
+
+ def set_serialized_encoding_parameters(self, params):
+ pass
+
def get_encryption_key(self, encoding_parameters):
- return defer.succeed(os.urandom(16))
+ if self._key is None:
+ self._key = os.urandom(16)
+ return defer.succeed(self._key)
class FileHandle(ConvergentUploadMixin):
uploader_class = self.uploader_class
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader_class = LiteralUploader
- uploader = uploader_class(self.parent, uploadable, options)
+ uploader = uploader_class(self.parent, options)
uploader.set_params(self.parent.get_encoding_parameters()
or self.DEFAULT_ENCODING_PARAMETERS)
- return uploader.start()
+ return uploader.start(uploadable)
d.addCallback(_got_size)
def _done(res):
uploadable.close()