From f6f9a97627d210a66a1ffe080d347943e4f41d48 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Thu, 5 Aug 2010 11:45:49 -0700
Subject: [PATCH] DownloadNode: fix lost-progress in fetch_failed, tolerate
 cancel when no segment-fetch is active. Fixes #1154.

The lost-progress bug occurred when two simultanous read() calls fetched
different segments, and the first one failed (due to corruption, or the other
bugs in #1154): the second read() would never complete. While in this state,
cancelling the second read by having its consumer call stopProducing) would
trigger the cancel-intolerance bug. Finally, in downloader.node.Cancel,
prevent late cancels by adding an 'active' flag
---
 src/allmydata/immutable/downloader/node.py |  16 ++-
 src/allmydata/test/test_download.py        | 144 ++++++++++++++++++++-
 2 files changed, 153 insertions(+), 7 deletions(-)

diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py
index 2991c9ee..4c92dd84 100644
--- a/src/allmydata/immutable/downloader/node.py
+++ b/src/allmydata/immutable/downloader/node.py
@@ -20,10 +20,10 @@ from common import BadCiphertextHashError
 class Cancel:
     def __init__(self, f):
         self._f = f
-        self.cancelled = False
+        self.active = True
     def cancel(self):
-        if not self.cancelled:
-            self.cancelled = True
+        if self.active:
+            self.active = False
             self._f(self)
 
 class DownloadNode:
@@ -360,10 +360,11 @@ class DownloadNode:
 
     def fetch_failed(self, sf, f):
         assert sf is self._active_segment
-        self._active_segment = None
         # deliver error upwards
         for (d,c) in self._extract_requests(sf.segnum):
             eventually(self._deliver, d, c, f)
+        self._active_segment = None
+        self._start_new_segment()
 
     def process_blocks(self, segnum, blocks):
         d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
@@ -449,7 +450,8 @@ class DownloadNode:
     def _deliver(self, d, c, result):
         # this method exists to handle cancel() that occurs between
         # _got_segment and _deliver
-        if not c.cancelled:
+        if c.active:
+            c.active = False # it is now too late to cancel
             d.callback(result) # might actually be an errback
 
     def _extract_requests(self, segnum):
@@ -465,7 +467,9 @@ class DownloadNode:
         self._segment_requests = [t for t in self._segment_requests
                                   if t[2] != c]
         segnums = [segnum for (segnum,d,c) in self._segment_requests]
-        if self._active_segment.segnum not in segnums:
+        # self._active_segment might be None in rare circumstances, so make
+        # sure we tolerate it
+        if self._active_segment and self._active_segment.segnum not in segnums:
             self._active_segment.stop()
             self._active_segment = None
             self._start_new_segment()
diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py
index fa42b247..7a26a000 100644
--- a/src/allmydata/test/test_download.py
+++ b/src/allmydata/test/test_download.py
@@ -486,6 +486,113 @@ class DownloadTest(_Base, unittest.TestCase):
         d.addCallback(_done)
         return d
 
+    def test_simultaneous_onefails_onecancelled(self):
+        # This exercises an mplayer behavior in ticket #1154. I believe that
+        # mplayer made two simultaneous webapi GET requests: first one for an
+        # index region at the end of the (mp3/video) file, then one for the
+        # first block of the file (the order doesn't really matter). All GETs
+        # failed (NoSharesError) because of the type(__len__)==long bug. Each
+        # GET submitted a DownloadNode.get_segment() request, which was
+        # queued by the DN (DN._segment_requests), so the second one was
+        # blocked waiting on the first one. When the first one failed,
+        # DN.fetch_failed() was invoked, which errbacks the first GET, but
+        # left the other one hanging (the lost-progress bug mentioned in
+        # #1154 comment 10)
+        #
+        # Then mplayer sees that the index region GET failed, so it cancels
+        # the first-block GET (by closing the HTTP request), triggering
+        # stopProducer. The second GET was waiting in the Deferred (between
+        # n.get_segment() and self._request_retired), so its
+        # _cancel_segment_request was active, so was invoked. However,
+        # DN._active_segment was None since it was not working on any segment
+        # at that time, hence the error in #1154.
+
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, so we can catch the download
+        # in the middle. Tell the downloader, so it can guess correctly.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            # corrupt all the shares so the download will fail
+            def _corruptor(s, debug=False):
+                which = 48 # first byte of block0
+                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+            self.corrupt_all_shares(ur.uri, _corruptor)
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+            con1 = MemoryConsumer()
+            con2 = MemoryConsumer()
+            d = n.read(con1, 0L, 20)
+            d2 = n.read(con2, 140L, 20)
+            # con2 will be cancelled, so d2 should fail with DownloadStopped
+            def _con2_should_not_succeed(res):
+                self.fail("the second read should not have succeeded")
+            def _con2_failed(f):
+                self.failUnless(f.check(DownloadStopped))
+            d2.addCallbacks(_con2_should_not_succeed, _con2_failed)
+
+            def _con1_should_not_succeed(res):
+                self.fail("the first read should not have succeeded")
+            def _con1_failed(f):
+                self.failUnless(f.check(NotEnoughSharesError))
+                con2.producer.stopProducing()
+                return d2
+            d.addCallbacks(_con1_should_not_succeed, _con1_failed)
+            return d
+        d.addCallback(_uploaded)
+        return d
+
+    def test_simultaneous_onefails(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, so we can catch the download
+        # in the middle. Tell the downloader, so it can guess correctly.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            # corrupt all the shares so the download will fail
+            def _corruptor(s, debug=False):
+                which = 48 # first byte of block0
+                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+            self.corrupt_all_shares(ur.uri, _corruptor)
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+            con1 = MemoryConsumer()
+            con2 = MemoryConsumer()
+            d = n.read(con1, 0L, 20)
+            d2 = n.read(con2, 140L, 20)
+            # con2 should wait for con1 to fail and then con2 should succeed.
+            # In particular, we should not lose progress. If this test fails,
+            # it will fail with a timeout error.
+            def _con2_should_succeed(res):
+                # this should succeed because we only corrupted the first
+                # segment of each share. The segment that holds [140:160] is
+                # fine, as are the hash chains and UEB.
+                self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
+            d2.addCallback(_con2_should_succeed)
+
+            def _con1_should_not_succeed(res):
+                self.fail("the first read should not have succeeded")
+            def _con1_failed(f):
+                self.failUnless(f.check(NotEnoughSharesError))
+                # we *don't* cancel the second one here: this exercises a
+                # lost-progress bug from #1154. We just wait for it to
+                # succeed.
+                return d2
+            d.addCallbacks(_con1_should_not_succeed, _con1_failed)
+            return d
+        d.addCallback(_uploaded)
+        return d
+
     def test_download_no_overrun(self):
         self.basedir = self.mktemp()
         self.set_up_grid()
@@ -599,7 +706,7 @@ class DownloadTest(_Base, unittest.TestCase):
         return d
 
     def test_stop(self):
-        # use a download targetthat does an immediate stop (ticket #473)
+        # use a download target that stops after the first segment (#473)
         self.basedir = self.mktemp()
         self.set_up_grid()
         self.c0 = self.g.clients[0]
@@ -611,6 +718,36 @@ class DownloadTest(_Base, unittest.TestCase):
                             n.read, c)
         return d
 
+    def test_stop_immediately(self):
+        # and a target that stops right after registerProducer (maybe #1154)
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+
+        c = ImmediatelyStoppingConsumer() # stops after registerProducer
+        d = self.shouldFail(DownloadStopped, "test_stop_immediately",
+                            "our Consumer called stopProducing()",
+                            n.read, c)
+        return d
+
+    def test_stop_immediately2(self):
+        # and a target that stops right after registerProducer (maybe #1154)
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+
+        c = MemoryConsumer()
+        d0 = n.read(c)
+        c.producer.stopProducing()
+        d = self.shouldFail(DownloadStopped, "test_stop_immediately",
+                            "our Consumer called stopProducing()",
+                            lambda: d0)
+        return d
+
     def test_download_segment_bad_ciphertext_hash(self):
         # The crypttext_hash_tree asserts the integrity of the decoded
         # ciphertext, and exists to detect two sorts of problems. The first
@@ -776,6 +913,11 @@ class StoppingConsumer(PausingConsumer):
     def write(self, data):
         self.producer.stopProducing()
 
+class ImmediatelyStoppingConsumer(MemoryConsumer):
+    def registerProducer(self, p, streaming):
+        MemoryConsumer.registerProducer(self, p, streaming)
+        self.producer.stopProducing()
+
 class StallingConsumer(MemoryConsumer):
     def __init__(self, halfway_cb):
         MemoryConsumer.__init__(self)
-- 
2.45.2