From: Brian Warner Date: Thu, 5 Aug 2010 18:45:49 +0000 (-0700) Subject: DownloadNode: fix lost-progress in fetch_failed, tolerate cancel when no segment... X-Git-Tag: allmydata-tahoe-1.8.0b2~8 X-Git-Url: https://git.rkrishnan.org/vdrive/listings/?a=commitdiff_plain;h=f6f9a97627d210a66a1ffe080d347943e4f41d48;p=tahoe-lafs%2Ftahoe-lafs.git DownloadNode: fix lost-progress in fetch_failed, tolerate cancel when no segment-fetch is active. Fixes #1154. The lost-progress bug occurred when two simultanous read() calls fetched different segments, and the first one failed (due to corruption, or the other bugs in #1154): the second read() would never complete. While in this state, cancelling the second read by having its consumer call stopProducing) would trigger the cancel-intolerance bug. Finally, in downloader.node.Cancel, prevent late cancels by adding an 'active' flag --- diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index 2991c9ee..4c92dd84 100644 --- a/src/allmydata/immutable/downloader/node.py +++ b/src/allmydata/immutable/downloader/node.py @@ -20,10 +20,10 @@ from common import BadCiphertextHashError class Cancel: def __init__(self, f): self._f = f - self.cancelled = False + self.active = True def cancel(self): - if not self.cancelled: - self.cancelled = True + if self.active: + self.active = False self._f(self) class DownloadNode: @@ -360,10 +360,11 @@ class DownloadNode: def fetch_failed(self, sf, f): assert sf is self._active_segment - self._active_segment = None # deliver error upwards for (d,c) in self._extract_requests(sf.segnum): eventually(self._deliver, d, c, f) + self._active_segment = None + self._start_new_segment() def process_blocks(self, segnum, blocks): d = defer.maybeDeferred(self._decode_blocks, segnum, blocks) @@ -449,7 +450,8 @@ class DownloadNode: def _deliver(self, d, c, result): # this method exists to handle cancel() that occurs between # _got_segment and _deliver - if not c.cancelled: + if c.active: + c.active = False # it is now too late to cancel d.callback(result) # might actually be an errback def _extract_requests(self, segnum): @@ -465,7 +467,9 @@ class DownloadNode: self._segment_requests = [t for t in self._segment_requests if t[2] != c] segnums = [segnum for (segnum,d,c) in self._segment_requests] - if self._active_segment.segnum not in segnums: + # self._active_segment might be None in rare circumstances, so make + # sure we tolerate it + if self._active_segment and self._active_segment.segnum not in segnums: self._active_segment.stop() self._active_segment = None self._start_new_segment() diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index fa42b247..7a26a000 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -486,6 +486,113 @@ class DownloadTest(_Base, unittest.TestCase): d.addCallback(_done) return d + def test_simultaneous_onefails_onecancelled(self): + # This exercises an mplayer behavior in ticket #1154. I believe that + # mplayer made two simultaneous webapi GET requests: first one for an + # index region at the end of the (mp3/video) file, then one for the + # first block of the file (the order doesn't really matter). All GETs + # failed (NoSharesError) because of the type(__len__)==long bug. Each + # GET submitted a DownloadNode.get_segment() request, which was + # queued by the DN (DN._segment_requests), so the second one was + # blocked waiting on the first one. When the first one failed, + # DN.fetch_failed() was invoked, which errbacks the first GET, but + # left the other one hanging (the lost-progress bug mentioned in + # #1154 comment 10) + # + # Then mplayer sees that the index region GET failed, so it cancels + # the first-block GET (by closing the HTTP request), triggering + # stopProducer. The second GET was waiting in the Deferred (between + # n.get_segment() and self._request_retired), so its + # _cancel_segment_request was active, so was invoked. However, + # DN._active_segment was None since it was not working on any segment + # at that time, hence the error in #1154. + + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, so we can catch the download + # in the middle. Tell the downloader, so it can guess correctly. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs + d = self.c0.upload(u) + def _uploaded(ur): + # corrupt all the shares so the download will fail + def _corruptor(s, debug=False): + which = 48 # first byte of block0 + return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] + self.corrupt_all_shares(ur.uri, _corruptor) + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._maybe_create_download_node() + n._cnode._node._build_guessed_tables(u.max_segment_size) + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = n.read(con1, 0L, 20) + d2 = n.read(con2, 140L, 20) + # con2 will be cancelled, so d2 should fail with DownloadStopped + def _con2_should_not_succeed(res): + self.fail("the second read should not have succeeded") + def _con2_failed(f): + self.failUnless(f.check(DownloadStopped)) + d2.addCallbacks(_con2_should_not_succeed, _con2_failed) + + def _con1_should_not_succeed(res): + self.fail("the first read should not have succeeded") + def _con1_failed(f): + self.failUnless(f.check(NotEnoughSharesError)) + con2.producer.stopProducing() + return d2 + d.addCallbacks(_con1_should_not_succeed, _con1_failed) + return d + d.addCallback(_uploaded) + return d + + def test_simultaneous_onefails(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, so we can catch the download + # in the middle. Tell the downloader, so it can guess correctly. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs + d = self.c0.upload(u) + def _uploaded(ur): + # corrupt all the shares so the download will fail + def _corruptor(s, debug=False): + which = 48 # first byte of block0 + return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] + self.corrupt_all_shares(ur.uri, _corruptor) + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._maybe_create_download_node() + n._cnode._node._build_guessed_tables(u.max_segment_size) + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = n.read(con1, 0L, 20) + d2 = n.read(con2, 140L, 20) + # con2 should wait for con1 to fail and then con2 should succeed. + # In particular, we should not lose progress. If this test fails, + # it will fail with a timeout error. + def _con2_should_succeed(res): + # this should succeed because we only corrupted the first + # segment of each share. The segment that holds [140:160] is + # fine, as are the hash chains and UEB. + self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) + d2.addCallback(_con2_should_succeed) + + def _con1_should_not_succeed(res): + self.fail("the first read should not have succeeded") + def _con1_failed(f): + self.failUnless(f.check(NotEnoughSharesError)) + # we *don't* cancel the second one here: this exercises a + # lost-progress bug from #1154. We just wait for it to + # succeed. + return d2 + d.addCallbacks(_con1_should_not_succeed, _con1_failed) + return d + d.addCallback(_uploaded) + return d + def test_download_no_overrun(self): self.basedir = self.mktemp() self.set_up_grid() @@ -599,7 +706,7 @@ class DownloadTest(_Base, unittest.TestCase): return d def test_stop(self): - # use a download targetthat does an immediate stop (ticket #473) + # use a download target that stops after the first segment (#473) self.basedir = self.mktemp() self.set_up_grid() self.c0 = self.g.clients[0] @@ -611,6 +718,36 @@ class DownloadTest(_Base, unittest.TestCase): n.read, c) return d + def test_stop_immediately(self): + # and a target that stops right after registerProducer (maybe #1154) + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + + c = ImmediatelyStoppingConsumer() # stops after registerProducer + d = self.shouldFail(DownloadStopped, "test_stop_immediately", + "our Consumer called stopProducing()", + n.read, c) + return d + + def test_stop_immediately2(self): + # and a target that stops right after registerProducer (maybe #1154) + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + + c = MemoryConsumer() + d0 = n.read(c) + c.producer.stopProducing() + d = self.shouldFail(DownloadStopped, "test_stop_immediately", + "our Consumer called stopProducing()", + lambda: d0) + return d + def test_download_segment_bad_ciphertext_hash(self): # The crypttext_hash_tree asserts the integrity of the decoded # ciphertext, and exists to detect two sorts of problems. The first @@ -776,6 +913,11 @@ class StoppingConsumer(PausingConsumer): def write(self, data): self.producer.stopProducing() +class ImmediatelyStoppingConsumer(MemoryConsumer): + def registerProducer(self, p, streaming): + MemoryConsumer.registerProducer(self, p, streaming) + self.producer.stopProducing() + class StallingConsumer(MemoryConsumer): def __init__(self, halfway_cb): MemoryConsumer.__init__(self)