From: Brian Warner <warner@lothar.com>
Date: Fri, 29 Oct 2010 08:20:36 +0000 (-0700)
Subject: fix #1223, crash+inefficiency during repair due to read overrun
X-Git-Url: https://git.rkrishnan.org/pf/content/en/footer/bar.txt?a=commitdiff_plain;h=c18953c169fe9e7cfca75c33474aab0c92968db8;p=tahoe-lafs%2Ftahoe-lafs.git

fix #1223, crash+inefficiency during repair due to read overrun

* repairer (really the uploader) reads beyond end of input file (Uploadable)
* new-downloader does not tolerate overreads
* uploader does lots of tiny reads (inefficient)

This fixes the last two. The uploader still does a single overread at the end
of the input file, but now that's ok so we can leave it in place. The
uploader now expects the Uploadable to behave like a normal disk
file (reading beyond EOF will return less data than was asked for), and now
the new-downloadable behaves that way.
---

diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py
index 33c16cfa..04482e63 100644
--- a/src/allmydata/immutable/downloader/node.py
+++ b/src/allmydata/immutable/downloader/node.py
@@ -130,8 +130,9 @@ class DownloadNode:
         # for concurrent operations: each gets its own Segmentation manager
         if size is None:
             size = self._verifycap.size
-        # clip size so offset+size does not go past EOF
-        size = min(size, self._verifycap.size-offset)
+        # ignore overruns: clip size so offset+size does not go past EOF, and
+        # so size is not negative (which indicates that offset >= EOF)
+        size = max(0, min(size, self._verifycap.size-offset))
         if read_ev is None:
             read_ev = self._download_status.add_read_event(offset, size, now())
 
@@ -143,6 +144,10 @@ class DownloadNode:
             sp = self._history.stats_provider
             sp.count("downloader.files_downloaded", 1) # really read() calls
             sp.count("downloader.bytes_downloaded", size)
+        if size == 0:
+            read_ev.finished(now())
+            # no data, so no producer, so no register/unregisterProducer
+            return defer.succeed(consumer)
         s = Segmentation(self, offset, size, consumer, read_ev, lp)
         # this raises an interesting question: what segments to fetch? if
         # offset=0, always fetch the first segment, and then allow
diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py
index c4474486..df7a3a1a 100644
--- a/src/allmydata/immutable/encode.py
+++ b/src/allmydata/immutable/encode.py
@@ -316,6 +316,9 @@ class Encoder(object):
         # of additional shares which can be substituted if the primary ones
         # are unavailable
 
+        # we read data from the source one segment at a time, and then chop
+        # it into 'input_piece_size' pieces before handing it to the codec
+
         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
 
         # memory footprint: we only hold a tiny piece of the plaintext at any
@@ -350,8 +353,7 @@ class Encoder(object):
         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
 
         d = self._gather_data(self.required_shares, input_piece_size,
-                              crypttext_segment_hasher,
-                              allow_short=True)
+                              crypttext_segment_hasher, allow_short=True)
         def _done_gathering(chunks):
             for c in chunks:
                 # a short trailing chunk will have been padded by
@@ -369,58 +371,50 @@ class Encoder(object):
 
     def _gather_data(self, num_chunks, input_chunk_size,
                      crypttext_segment_hasher,
-                     allow_short=False,
-                     previous_chunks=[]):
+                     allow_short=False):
         """Return a Deferred that will fire when the required number of
         chunks have been read (and hashed and encrypted). The Deferred fires
-        with the combination of any 'previous_chunks' and the new chunks
-        which were gathered."""
+        with a list of chunks, each of size input_chunk_size."""
+
+        # I originally built this to allow read_encrypted() to behave badly:
+        # to let it return more or less data than you asked for. It would
+        # stash the leftovers until later, and then recurse until it got
+        # enough. I don't think that was actually useful.
+        #
+        # who defines read_encrypted?
+        #  offloaded.LocalCiphertextReader: real disk file: exact
+        #  upload.EncryptAnUploadable: Uploadable, but a wrapper that makes
+        #    it exact. The return value is a list of 50KiB chunks, to reduce
+        #    the memory footprint of the encryption process.
+        #  repairer.Repairer: immutable.filenode.CiphertextFileNode: exact
+        #
+        # This has been redefined to require read_encrypted() to behave like
+        # a local file: return exactly the amount requested unless it hits
+        # EOF.
+        #  -warner
 
         if self._aborted:
             raise UploadAborted()
 
-        if not num_chunks:
-            return defer.succeed(previous_chunks)
-
-        d = self._uploadable.read_encrypted(input_chunk_size, False)
+        read_size = num_chunks * input_chunk_size
+        d = self._uploadable.read_encrypted(read_size, hash_only=False)
         def _got(data):
+            assert isinstance(data, (list,tuple))
             if self._aborted:
                 raise UploadAborted()
-            encrypted_pieces = []
-            length = 0
-            while data:
-                encrypted_piece = data.pop(0)
-                length += len(encrypted_piece)
-                crypttext_segment_hasher.update(encrypted_piece)
-                self._crypttext_hasher.update(encrypted_piece)
-                encrypted_pieces.append(encrypted_piece)
-
-            precondition(length <= input_chunk_size,
-                         "length=%d > input_chunk_size=%d" %
-                         (length, input_chunk_size))
-            if allow_short:
-                if length < input_chunk_size:
-                    # padding
-                    pad_size = input_chunk_size - length
-                    encrypted_pieces.append('\x00' * pad_size)
-            else:
-                # non-tail segments should be the full segment size
-                if length != input_chunk_size:
-                    log.msg("non-tail segment should be full segment size: %d!=%d"
-                            % (length, input_chunk_size),
-                            level=log.BAD, umid="jNk5Yw")
-                precondition(length == input_chunk_size,
-                             "length=%d != input_chunk_size=%d" %
-                             (length, input_chunk_size))
-
-            encrypted_piece = "".join(encrypted_pieces)
-            return previous_chunks + [encrypted_piece]
-
+            data = "".join(data)
+            precondition(len(data) <= read_size, len(data), read_size)
+            if not allow_short:
+                precondition(len(data) == read_size, len(data), read_size)
+            crypttext_segment_hasher.update(data)
+            self._crypttext_hasher.update(data)
+            if allow_short and len(data) < read_size:
+                # padding
+                data += "\x00" * (read_size - len(data))
+            encrypted_pieces = [data[i:i+input_chunk_size]
+                                for i in range(0, len(data), input_chunk_size)]
+            return encrypted_pieces
         d.addCallback(_got)
-        d.addCallback(lambda chunks:
-                      self._gather_data(num_chunks-1, input_chunk_size,
-                                        crypttext_segment_hasher,
-                                        allow_short, chunks))
         return d
 
     def _send_segment(self, (shares, shareids), segnum):
diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
index c5a47e12..48094a94 100644
--- a/src/allmydata/interfaces.py
+++ b/src/allmydata/interfaces.py
@@ -1622,7 +1622,11 @@ class IUploadable(Interface):
 
         If the data must be acquired through multiple internal read
         operations, returning a list instead of a single string may help to
-        reduce string copies.
+        reduce string copies. However, the length of the concatenated strings
+        must equal the amount of data requested, unless EOF is encountered.
+        Long reads, or short reads without EOF, are not allowed. read()
+        should return the same amount of data as a local disk file read, just
+        in a different shape and asynchronously.
 
         'length' will typically be equal to (min(get_size(),1MB)/req_shares),
         so a 10kB file means length=3kB, 100kB file means length=30kB,
diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py
index 49c4cff0..942d3272 100644
--- a/src/allmydata/test/test_repairer.py
+++ b/src/allmydata/test/test_repairer.py
@@ -672,6 +672,35 @@ class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
         return d
     #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
 
+    def test_tiny_reads(self):
+        # ticket #1223 points out three problems:
+        #   repairer reads beyond end of input file
+        #   new-downloader does not tolerate overreads
+        #   uploader does lots of tiny reads, inefficient
+        self.basedir = "repairer/Repairer/test_tiny_reads"
+        self.set_up_grid()
+        c0 = self.g.clients[0]
+        DATA = "a"*135
+        c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22
+        c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66
+        d = c0.upload(upload.Data(DATA, convergence=""))
+        def _then(ur):
+            self.uri = ur.uri
+            self.delete_shares_numbered(self.uri, [0])
+            self.c0_filenode = c0.create_node_from_uri(ur.uri)
+            self._stash_counts()
+            return self.c0_filenode.check_and_repair(Monitor())
+        d.addCallback(_then)
+        def _check(ign):
+            (r,a,w) = self._get_delta_counts()
+            # when the uploader (driven by the repairer) does full-segment
+            # reads, this makes 44 server read calls (2*k). Before, when it
+            # was doing input_chunk_size reads (7 bytes), it was doing over
+            # 400.
+            self.failIf(r > 100, "too many reads: %d>100" % r)
+        d.addCallback(_check)
+        return d
+
 
 # XXX extend these tests to show that the checker detects which specific
 # share on which specific server is broken -- this is necessary so that the