From: Brian Warner <warner@allmydata.com>
Date: Fri, 25 Jan 2008 00:25:33 +0000 (-0700)
Subject: offloaded uploader: don't use a huge amount of memory when skipping over previously... 
X-Git-Tag: allmydata-tahoe-0.8.0~231
X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/file/URI:LIT:krugkidfnzsc4/index.php?a=commitdiff_plain;h=46fe024612424f780e644e13a08f7343fe125d2d;p=tahoe-lafs%2Ftahoe-lafs.git

offloaded uploader: don't use a huge amount of memory when skipping over previously-uploaded data
---

diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py
index b9946297..1615b00f 100644
--- a/src/allmydata/encode.py
+++ b/src/allmydata/encode.py
@@ -332,7 +332,7 @@ class Encoder(object):
         if not num_chunks:
             return defer.succeed(previous_chunks)
 
-        d = self._uploadable.read_encrypted(input_chunk_size)
+        d = self._uploadable.read_encrypted(input_chunk_size, False)
         def _got(data):
             if self._aborted:
                 raise UploadAborted()
diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
index 0bbd86c1..54a16d72 100644
--- a/src/allmydata/interfaces.py
+++ b/src/allmydata/interfaces.py
@@ -988,9 +988,12 @@ class IEncryptedUploadable(Interface):
         """Return a Deferred that fires with a 16-byte storage index.
         """
 
-    def read_encrypted(length):
+    def read_encrypted(length, hash_only):
         """This behaves just like IUploadable.read(), but returns crypttext
-        instead of plaintext."""
+        instead of plaintext. If hash_only is True, then this discards the
+        data (and returns an empty list); this improves efficiency when
+        resuming an interrupted upload (where we need to compute the
+        plaintext hashes, but don't need the redundant encrypted data)."""
 
     def get_plaintext_hashtree_leaves(first, last, num_segments):
         """Get the leaf nodes of a merkle hash tree over the plaintext
diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py
index 66b7fbc7..584c0c01 100644
--- a/src/allmydata/offloaded.py
+++ b/src/allmydata/offloaded.py
@@ -232,7 +232,10 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         fetch_size = min(needed, self.CHUNK_SIZE)
         if fetch_size == 0:
             return True # all done
-        self.log("fetching %d-%d" % (self._have, self._have+fetch_size),
+        self.log(format="fetching %(start)d-%(end)d of %(total)d",
+                 start=self._have,
+                 end=self._have+fetch_size,
+                 total=self._expected_size,
                  level=log.NOISY)
         d = self.call("read_encrypted", self._have, fetch_size)
         def _got_data(ciphertext_v):
@@ -286,7 +289,8 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
     def get_storage_index(self):
         return defer.succeed(self._storage_index)
 
-    def read_encrypted(self, length):
+    def read_encrypted(self, length, hash_only):
+        assert hash_only is False
         d = defer.maybeDeferred(self.f.read, length)
         d.addCallback(lambda data: [data])
         return d
diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py
index 04880a8a..51a4c118 100644
--- a/src/allmydata/upload.py
+++ b/src/allmydata/upload.py
@@ -328,9 +328,10 @@ class EncryptAnUploadable:
     """This is a wrapper that takes an IUploadable and provides
     IEncryptedUploadable."""
     implements(IEncryptedUploadable)
+    CHUNKSIZE = 50*1000
 
     def __init__(self, original, default_encoding_parameters):
-        self.original = original
+        self.original = IUploadable(original)
         assert isinstance(default_encoding_parameters, dict)
         self._default_encoding_parameters = default_encoding_parameters
         self._encryptor = None
@@ -451,32 +452,50 @@ class EncryptAnUploadable:
 
             offset += this_segment
 
-    def read_encrypted(self, length):
+    def read_encrypted(self, length, hash_only):
         # make sure our parameters have been set up first
         d = self.get_all_encoding_parameters()
         d.addCallback(lambda ignored: self._get_encryptor())
         # then fetch the plaintext
-        d.addCallback(lambda ignored: self.original.read(length))
-        # and encrypt it..
-        # through the fields we go, hashing all the way, sHA! sHA! sHA!
-        def _got(data):
-            assert isinstance(data, (tuple, list)), type(data)
-            data = list(data)
-            cryptdata = []
-            # we use data.pop(0) instead of 'for chunk in data' to save
-            # memory: each chunk is destroyed as soon as we're done with it.
-            while data:
-                chunk = data.pop(0)
-                log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk),
-                        level=log.NOISY)
-                self._plaintext_hasher.update(chunk)
-                self._update_segment_hash(chunk)
-                cryptdata.append(self._encryptor.process(chunk))
-                del chunk
-            return cryptdata
-        d.addCallback(_got)
+        remaining = length
+        ciphertext = []
+        while remaining:
+            # tolerate large length= values without consuming a lot of RAM
+            chunksize = min(remaining, self.CHUNKSIZE)
+            remaining -= chunksize
+            d.addCallback(lambda ignored: self.original.read(chunksize))
+            # and encrypt it..
+            # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
+            d.addCallback(self._hash_and_encrypt_plaintext, hash_only)
+            d.addCallback(ciphertext.extend)
+        d.addCallback(lambda res: ciphertext)
         return d
 
+    def _hash_and_encrypt_plaintext(self, data, hash_only):
+        assert isinstance(data, (tuple, list)), type(data)
+        data = list(data)
+        cryptdata = []
+        # we use data.pop(0) instead of 'for chunk in data' to save
+        # memory: each chunk is destroyed as soon as we're done with it.
+        while data:
+            chunk = data.pop(0)
+            log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk),
+                    level=log.NOISY)
+            self._plaintext_hasher.update(chunk)
+            self._update_segment_hash(chunk)
+            # TODO: we have to encrypt the data (even if hash_only==True)
+            # because pycryptopp's AES-CTR implementation doesn't offer a
+            # way to change the counter value. Once pycryptopp acquires
+            # this ability, change this to simply update the counter
+            # before each call to (hash_only==False) _encryptor.process()
+            ciphertext = self._encryptor.process(chunk)
+            if not hash_only:
+                log.msg("  skipping encryption")
+                cryptdata.append(ciphertext)
+            del ciphertext
+            del chunk
+        return cryptdata
+
     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
         if len(self._plaintext_segment_hashes) < num_segments:
             # close out the last one
@@ -650,6 +669,18 @@ class RemoteEncryptedUploadable(Referenceable):
     def remote_get_all_encoding_parameters(self):
         return self._eu.get_all_encoding_parameters()
 
+    def _read_encrypted(self, length, hash_only):
+        d = self._eu.read_encrypted(length, hash_only)
+        def _read(strings):
+            if hash_only:
+                self._offset += length
+            else:
+                size = sum([len(data) for data in strings])
+                self._offset += size
+            return strings
+        d.addCallback(_read)
+        return d
+
     def remote_read_encrypted(self, offset, length):
         # we don't support seek backwards, but we allow skipping forwards
         precondition(offset >= 0, offset)
@@ -662,25 +693,25 @@ class RemoteEncryptedUploadable(Referenceable):
             skip = offset - self._offset
             log.msg("remote_read_encrypted skipping ahead to %d, skip=%d" %
                     (self._offset, skip), level=log.UNUSUAL, parent=lp)
-            d = self.remote_read_encrypted(self._offset, skip)
-            def _ignore(strings):
-                size = sum([len(data) for data in strings])
-                self._bytes_sent -= size
-                return self.remote_read_encrypted(offset, length)
-            d.addCallback(_ignore)
-            return d
+            d = self._read_encrypted(skip, hash_only=True)
+        else:
+            d = defer.succeed(None)
+
+        def _at_correct_offset(res):
+            assert offset == self._offset, "%d != %d" % (offset, self._offset)
+            if self._cutoff is not None and offset+length > self._cutoff:
+                self._cutoff_cb()
+
+            return self._read_encrypted(length, hash_only=False)
+        d.addCallback(_at_correct_offset)
 
-        assert offset == self._offset, "%d != %d" % (offset, self._offset)
-        if self._cutoff is not None and offset+length > self._cutoff:
-            self._cutoff_cb()
-        d = self._eu.read_encrypted(length)
         def _read(strings):
             size = sum([len(data) for data in strings])
             self._bytes_sent += size
-            self._offset += size
             return strings
         d.addCallback(_read)
         return d
+
     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
                 (first, last-1, num_segments),