]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
encode/download: reduce memory footprint by deleting large intermediate buffers as...
authorBrian Warner <warner@lothar.com>
Thu, 7 Jun 2007 20:15:58 +0000 (13:15 -0700)
committerBrian Warner <warner@lothar.com>
Thu, 7 Jun 2007 20:15:58 +0000 (13:15 -0700)
src/allmydata/download.py
src/allmydata/encode.py

index db0c1432a9bec3b8917d9a06c9719b00a0911bc4..f4c6fbee32cf30e4346c9ddcd71b60a47a05ccfd 100644 (file)
@@ -9,6 +9,7 @@ from allmydata.util import idlib, mathutil, bencode
 from allmydata.util.assertutil import _assert
 from allmydata import codec, hashtree
 from allmydata.Crypto.Cipher import AES
+from allmydata.Crypto.Hash import SHA256
 from allmydata.uri import unpack_uri
 from allmydata.interfaces import IDownloadTarget, IDownloader
 
@@ -34,6 +35,7 @@ class Output:
         self._verifierid_hasher = sha.new(netstring("allmydata_verifierid_v1"))
         self._fileid_hasher = sha.new(netstring("allmydata_fileid_v1"))
         self.length = 0
+        self._segment_number = 0
         self._plaintext_hash_tree = None
         self._crypttext_hash_tree = None
 
@@ -44,11 +46,34 @@ class Output:
     def open(self):
         self.downloadable.open()
 
-    def write(self, crypttext):
+    def write_segment(self, crypttext):
         self.length += len(crypttext)
+
+        # memory footprint: 'crypttext' is the only segment_size usage
+        # outstanding. While we decrypt it into 'plaintext', we hit
+        # 2*segment_size.
         self._verifierid_hasher.update(crypttext)
+        if self._crypttext_hash_tree:
+            ch = SHA256.new(netstring("allmydata_crypttext_segment_v1"))
+            ch.update(crypttext)
+            crypttext_leaves = {self._segment_number: ch.digest()}
+            self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves)
+
         plaintext = self._decryptor.decrypt(crypttext)
+        del crypttext
+
+        # now we're back down to 1*segment_size.
+
         self._fileid_hasher.update(plaintext)
+        if self._plaintext_hash_tree:
+            ph = SHA256.new(netstring("allmydata_plaintext_segment_v1"))
+            ph.update(plaintext)
+            plaintext_leaves = {self._segment_number: ph.digest()}
+            self._plaintext_hash_tree.set_hashes(leaves=plaintext_leaves)
+
+        self._segment_number += 1
+        # We're still at 1*segment_size. The Downloadable is responsible for
+        # any memory usage beyond this.
         self.downloadable.write(plaintext)
 
     def close(self):
@@ -458,13 +483,28 @@ class FileDownloader:
         return d
 
     def _download_segment(self, res, segnum):
+        # memory footprint: when the SegmentDownloader finishes pulling down
+        # all shares, we have 1*segment_size of usage.
         segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares)
         d = segmentdler.start()
+        # while the codec does its job, we hit 2*segment_size
         d.addCallback(lambda (shares, shareids):
                       self._codec.decode(shares, shareids))
-        def _done(res):
-            for buf in res:
-                self._output.write(buf)
+        # once the codec is done, we drop back to 1*segment_size, because
+        # 'shares' goes out of scope. The memory usage is all in the
+        # plaintext now, spread out into a bunch of tiny buffers.
+        def _done(buffers):
+            # we start by joining all these buffers together into a single
+            # string. This makes Output.write easier, since it wants to hash
+            # data one segment at a time anyways, and doesn't impact our
+            # memory footprint since we're already peaking at 2*segment_size
+            # inside the codec a moment ago.
+            segment = "".join(buffers)
+            del buffers
+            # we're down to 1*segment_size right now, but write_segment()
+            # will decrypt a copy of the segment internally, which will push
+            # us up to 2*segment_size while it runs.
+            self._output.write_segment(segment)
         d.addCallback(_done)
         return d
 
@@ -473,14 +513,16 @@ class FileDownloader:
         d = segmentdler.start()
         d.addCallback(lambda (shares, shareids):
                       self._tail_codec.decode(shares, shareids))
-        def _done(res):
+        def _done(buffers):
             # trim off any padding added by the upload side
-            data = ''.join(res)
+            segment = "".join(buffers)
+            del buffers
             # we never send empty segments. If the data was an exact multiple
             # of the segment size, the last segment will be full.
             pad_size = mathutil.pad_size(self._size, self._segment_size)
             tail_size = self._segment_size - pad_size
-            self._output.write(data[:tail_size])
+            segment = segment[:tail_size]
+            self._output.write_segment(segment)
         d.addCallback(_done)
         return d
 
index 61a0c5a0c6b9d5541e020059b96d979149246c93..c7dcb3aee65e315fe794a3e2658206a1e0fdbdc4 100644 (file)
@@ -226,18 +226,29 @@ class Encoder(object):
         plaintext_hasher = SHA256.new(netstring("allmydata_plaintext_segment_v1"))
         crypttext_hasher = SHA256.new(netstring("allmydata_crypttext_segment_v1"))
 
+        # memory footprint: we only hold a tiny piece of the plaintext at any
+        # given time. We build up a segment's worth of cryptttext, then hand
+        # it to the encoder. Assuming 25-of-100 encoding (4x expansion) and
+        # 2MiB max_segment_size, we get a peak memory footprint of 5*2MiB =
+        # 10MiB. Lowering max_segment_size to, say, 100KiB would drop the
+        # footprint to 500KiB at the expense of more hash-tree overhead.
+
         for i in range(self.required_shares):
             input_piece = self.infile.read(input_piece_size)
             # non-tail segments should be the full segment size
             assert len(input_piece) == input_piece_size
             plaintext_hasher.update(input_piece)
             encrypted_piece = self.cryptor.encrypt(input_piece)
+            assert len(encrypted_piece) == len(input_piece)
             crypttext_hasher.update(encrypted_piece)
+
             chunks.append(encrypted_piece)
 
         self._plaintext_hashes.append(plaintext_hasher.digest())
         self._crypttext_hashes.append(crypttext_hasher.digest())
-        d = codec.encode(chunks)
+
+        d = codec.encode(chunks) # during this call, we hit 5*segsize memory
+        del chunks
         d.addCallback(self._encoded_segment, segnum)
         return d
 
@@ -252,17 +263,22 @@ class Encoder(object):
         for i in range(self.required_shares):
             input_piece = self.infile.read(input_piece_size)
             plaintext_hasher.update(input_piece)
-            if len(input_piece) < input_piece_size:
-                # padding
-                input_piece += ('\x00' * (input_piece_size - len(input_piece)))
             encrypted_piece = self.cryptor.encrypt(input_piece)
+            assert len(encrypted_piece) == len(input_piece)
             crypttext_hasher.update(encrypted_piece)
+
+            if len(encrypted_piece) < input_piece_size:
+                # padding
+                pad_size = (input_piece_size - len(encrypted_piece))
+                encrypted_piece += ('\x00' * pad_size)
+
             chunks.append(encrypted_piece)
 
         self._plaintext_hashes.append(plaintext_hasher.digest())
         self._crypttext_hashes.append(crypttext_hasher.digest())
 
         d = codec.encode(chunks)
+        del chunks
         d.addCallback(self._encoded_segment, segnum)
         return d