From: Brian Warner Date: Thu, 7 Jun 2007 20:15:58 +0000 (-0700) Subject: encode/download: reduce memory footprint by deleting large intermediate buffers as... X-Git-Tag: allmydata-tahoe-0.3.0~20 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri//%22%22?a=commitdiff_plain;h=b2caf7fb9aa9bf6ba0a05a31ae82ab2e122d7303;p=tahoe-lafs%2Ftahoe-lafs.git encode/download: reduce memory footprint by deleting large intermediate buffers as soon as possible, improve hash tree usage --- diff --git a/src/allmydata/download.py b/src/allmydata/download.py index db0c1432..f4c6fbee 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -9,6 +9,7 @@ from allmydata.util import idlib, mathutil, bencode from allmydata.util.assertutil import _assert from allmydata import codec, hashtree from allmydata.Crypto.Cipher import AES +from allmydata.Crypto.Hash import SHA256 from allmydata.uri import unpack_uri from allmydata.interfaces import IDownloadTarget, IDownloader @@ -34,6 +35,7 @@ class Output: self._verifierid_hasher = sha.new(netstring("allmydata_verifierid_v1")) self._fileid_hasher = sha.new(netstring("allmydata_fileid_v1")) self.length = 0 + self._segment_number = 0 self._plaintext_hash_tree = None self._crypttext_hash_tree = None @@ -44,11 +46,34 @@ class Output: def open(self): self.downloadable.open() - def write(self, crypttext): + def write_segment(self, crypttext): self.length += len(crypttext) + + # memory footprint: 'crypttext' is the only segment_size usage + # outstanding. While we decrypt it into 'plaintext', we hit + # 2*segment_size. self._verifierid_hasher.update(crypttext) + if self._crypttext_hash_tree: + ch = SHA256.new(netstring("allmydata_crypttext_segment_v1")) + ch.update(crypttext) + crypttext_leaves = {self._segment_number: ch.digest()} + self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves) + plaintext = self._decryptor.decrypt(crypttext) + del crypttext + + # now we're back down to 1*segment_size. + self._fileid_hasher.update(plaintext) + if self._plaintext_hash_tree: + ph = SHA256.new(netstring("allmydata_plaintext_segment_v1")) + ph.update(plaintext) + plaintext_leaves = {self._segment_number: ph.digest()} + self._plaintext_hash_tree.set_hashes(leaves=plaintext_leaves) + + self._segment_number += 1 + # We're still at 1*segment_size. The Downloadable is responsible for + # any memory usage beyond this. self.downloadable.write(plaintext) def close(self): @@ -458,13 +483,28 @@ class FileDownloader: return d def _download_segment(self, res, segnum): + # memory footprint: when the SegmentDownloader finishes pulling down + # all shares, we have 1*segment_size of usage. segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares) d = segmentdler.start() + # while the codec does its job, we hit 2*segment_size d.addCallback(lambda (shares, shareids): self._codec.decode(shares, shareids)) - def _done(res): - for buf in res: - self._output.write(buf) + # once the codec is done, we drop back to 1*segment_size, because + # 'shares' goes out of scope. The memory usage is all in the + # plaintext now, spread out into a bunch of tiny buffers. + def _done(buffers): + # we start by joining all these buffers together into a single + # string. This makes Output.write easier, since it wants to hash + # data one segment at a time anyways, and doesn't impact our + # memory footprint since we're already peaking at 2*segment_size + # inside the codec a moment ago. + segment = "".join(buffers) + del buffers + # we're down to 1*segment_size right now, but write_segment() + # will decrypt a copy of the segment internally, which will push + # us up to 2*segment_size while it runs. + self._output.write_segment(segment) d.addCallback(_done) return d @@ -473,14 +513,16 @@ class FileDownloader: d = segmentdler.start() d.addCallback(lambda (shares, shareids): self._tail_codec.decode(shares, shareids)) - def _done(res): + def _done(buffers): # trim off any padding added by the upload side - data = ''.join(res) + segment = "".join(buffers) + del buffers # we never send empty segments. If the data was an exact multiple # of the segment size, the last segment will be full. pad_size = mathutil.pad_size(self._size, self._segment_size) tail_size = self._segment_size - pad_size - self._output.write(data[:tail_size]) + segment = segment[:tail_size] + self._output.write_segment(segment) d.addCallback(_done) return d diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index 61a0c5a0..c7dcb3ae 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -226,18 +226,29 @@ class Encoder(object): plaintext_hasher = SHA256.new(netstring("allmydata_plaintext_segment_v1")) crypttext_hasher = SHA256.new(netstring("allmydata_crypttext_segment_v1")) + # memory footprint: we only hold a tiny piece of the plaintext at any + # given time. We build up a segment's worth of cryptttext, then hand + # it to the encoder. Assuming 25-of-100 encoding (4x expansion) and + # 2MiB max_segment_size, we get a peak memory footprint of 5*2MiB = + # 10MiB. Lowering max_segment_size to, say, 100KiB would drop the + # footprint to 500KiB at the expense of more hash-tree overhead. + for i in range(self.required_shares): input_piece = self.infile.read(input_piece_size) # non-tail segments should be the full segment size assert len(input_piece) == input_piece_size plaintext_hasher.update(input_piece) encrypted_piece = self.cryptor.encrypt(input_piece) + assert len(encrypted_piece) == len(input_piece) crypttext_hasher.update(encrypted_piece) + chunks.append(encrypted_piece) self._plaintext_hashes.append(plaintext_hasher.digest()) self._crypttext_hashes.append(crypttext_hasher.digest()) - d = codec.encode(chunks) + + d = codec.encode(chunks) # during this call, we hit 5*segsize memory + del chunks d.addCallback(self._encoded_segment, segnum) return d @@ -252,17 +263,22 @@ class Encoder(object): for i in range(self.required_shares): input_piece = self.infile.read(input_piece_size) plaintext_hasher.update(input_piece) - if len(input_piece) < input_piece_size: - # padding - input_piece += ('\x00' * (input_piece_size - len(input_piece))) encrypted_piece = self.cryptor.encrypt(input_piece) + assert len(encrypted_piece) == len(input_piece) crypttext_hasher.update(encrypted_piece) + + if len(encrypted_piece) < input_piece_size: + # padding + pad_size = (input_piece_size - len(encrypted_piece)) + encrypted_piece += ('\x00' * pad_size) + chunks.append(encrypted_piece) self._plaintext_hashes.append(plaintext_hasher.digest()) self._crypttext_hashes.append(crypttext_hasher.digest()) d = codec.encode(chunks) + del chunks d.addCallback(self._encoded_segment, segnum) return d