From: Brian Warner Date: Fri, 25 Jan 2008 00:25:33 +0000 (-0700) Subject: offloaded uploader: don't use a huge amount of memory when skipping over previously... X-Git-Tag: allmydata-tahoe-0.8.0~231 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/rgr-080307.php?a=commitdiff_plain;h=46fe024612424f780e644e13a08f7343fe125d2d;p=tahoe-lafs%2Ftahoe-lafs.git offloaded uploader: don't use a huge amount of memory when skipping over previously-uploaded data --- diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index b9946297..1615b00f 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -332,7 +332,7 @@ class Encoder(object): if not num_chunks: return defer.succeed(previous_chunks) - d = self._uploadable.read_encrypted(input_chunk_size) + d = self._uploadable.read_encrypted(input_chunk_size, False) def _got(data): if self._aborted: raise UploadAborted() diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 0bbd86c1..54a16d72 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -988,9 +988,12 @@ class IEncryptedUploadable(Interface): """Return a Deferred that fires with a 16-byte storage index. """ - def read_encrypted(length): + def read_encrypted(length, hash_only): """This behaves just like IUploadable.read(), but returns crypttext - instead of plaintext.""" + instead of plaintext. If hash_only is True, then this discards the + data (and returns an empty list); this improves efficiency when + resuming an interrupted upload (where we need to compute the + plaintext hashes, but don't need the redundant encrypted data).""" def get_plaintext_hashtree_leaves(first, last, num_segments): """Get the leaf nodes of a merkle hash tree over the plaintext diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 66b7fbc7..584c0c01 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -232,7 +232,10 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): fetch_size = min(needed, self.CHUNK_SIZE) if fetch_size == 0: return True # all done - self.log("fetching %d-%d" % (self._have, self._have+fetch_size), + self.log(format="fetching %(start)d-%(end)d of %(total)d", + start=self._have, + end=self._have+fetch_size, + total=self._expected_size, level=log.NOISY) d = self.call("read_encrypted", self._have, fetch_size) def _got_data(ciphertext_v): @@ -286,7 +289,8 @@ class LocalCiphertextReader(AskUntilSuccessMixin): def get_storage_index(self): return defer.succeed(self._storage_index) - def read_encrypted(self, length): + def read_encrypted(self, length, hash_only): + assert hash_only is False d = defer.maybeDeferred(self.f.read, length) d.addCallback(lambda data: [data]) return d diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 04880a8a..51a4c118 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -328,9 +328,10 @@ class EncryptAnUploadable: """This is a wrapper that takes an IUploadable and provides IEncryptedUploadable.""" implements(IEncryptedUploadable) + CHUNKSIZE = 50*1000 def __init__(self, original, default_encoding_parameters): - self.original = original + self.original = IUploadable(original) assert isinstance(default_encoding_parameters, dict) self._default_encoding_parameters = default_encoding_parameters self._encryptor = None @@ -451,32 +452,50 @@ class EncryptAnUploadable: offset += this_segment - def read_encrypted(self, length): + def read_encrypted(self, length, hash_only): # make sure our parameters have been set up first d = self.get_all_encoding_parameters() d.addCallback(lambda ignored: self._get_encryptor()) # then fetch the plaintext - d.addCallback(lambda ignored: self.original.read(length)) - # and encrypt it.. - # through the fields we go, hashing all the way, sHA! sHA! sHA! - def _got(data): - assert isinstance(data, (tuple, list)), type(data) - data = list(data) - cryptdata = [] - # we use data.pop(0) instead of 'for chunk in data' to save - # memory: each chunk is destroyed as soon as we're done with it. - while data: - chunk = data.pop(0) - log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk), - level=log.NOISY) - self._plaintext_hasher.update(chunk) - self._update_segment_hash(chunk) - cryptdata.append(self._encryptor.process(chunk)) - del chunk - return cryptdata - d.addCallback(_got) + remaining = length + ciphertext = [] + while remaining: + # tolerate large length= values without consuming a lot of RAM + chunksize = min(remaining, self.CHUNKSIZE) + remaining -= chunksize + d.addCallback(lambda ignored: self.original.read(chunksize)) + # and encrypt it.. + # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/' + d.addCallback(self._hash_and_encrypt_plaintext, hash_only) + d.addCallback(ciphertext.extend) + d.addCallback(lambda res: ciphertext) return d + def _hash_and_encrypt_plaintext(self, data, hash_only): + assert isinstance(data, (tuple, list)), type(data) + data = list(data) + cryptdata = [] + # we use data.pop(0) instead of 'for chunk in data' to save + # memory: each chunk is destroyed as soon as we're done with it. + while data: + chunk = data.pop(0) + log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk), + level=log.NOISY) + self._plaintext_hasher.update(chunk) + self._update_segment_hash(chunk) + # TODO: we have to encrypt the data (even if hash_only==True) + # because pycryptopp's AES-CTR implementation doesn't offer a + # way to change the counter value. Once pycryptopp acquires + # this ability, change this to simply update the counter + # before each call to (hash_only==False) _encryptor.process() + ciphertext = self._encryptor.process(chunk) + if not hash_only: + log.msg(" skipping encryption") + cryptdata.append(ciphertext) + del ciphertext + del chunk + return cryptdata + def get_plaintext_hashtree_leaves(self, first, last, num_segments): if len(self._plaintext_segment_hashes) < num_segments: # close out the last one @@ -650,6 +669,18 @@ class RemoteEncryptedUploadable(Referenceable): def remote_get_all_encoding_parameters(self): return self._eu.get_all_encoding_parameters() + def _read_encrypted(self, length, hash_only): + d = self._eu.read_encrypted(length, hash_only) + def _read(strings): + if hash_only: + self._offset += length + else: + size = sum([len(data) for data in strings]) + self._offset += size + return strings + d.addCallback(_read) + return d + def remote_read_encrypted(self, offset, length): # we don't support seek backwards, but we allow skipping forwards precondition(offset >= 0, offset) @@ -662,25 +693,25 @@ class RemoteEncryptedUploadable(Referenceable): skip = offset - self._offset log.msg("remote_read_encrypted skipping ahead to %d, skip=%d" % (self._offset, skip), level=log.UNUSUAL, parent=lp) - d = self.remote_read_encrypted(self._offset, skip) - def _ignore(strings): - size = sum([len(data) for data in strings]) - self._bytes_sent -= size - return self.remote_read_encrypted(offset, length) - d.addCallback(_ignore) - return d + d = self._read_encrypted(skip, hash_only=True) + else: + d = defer.succeed(None) + + def _at_correct_offset(res): + assert offset == self._offset, "%d != %d" % (offset, self._offset) + if self._cutoff is not None and offset+length > self._cutoff: + self._cutoff_cb() + + return self._read_encrypted(length, hash_only=False) + d.addCallback(_at_correct_offset) - assert offset == self._offset, "%d != %d" % (offset, self._offset) - if self._cutoff is not None and offset+length > self._cutoff: - self._cutoff_cb() - d = self._eu.read_encrypted(length) def _read(strings): size = sum([len(data) for data in strings]) self._bytes_sent += size - self._offset += size return strings d.addCallback(_read) return d + def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments): log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" % (first, last-1, num_segments),