class Cancel:
def __init__(self, f):
self._f = f
- self.cancelled = False
+ self.active = True
def cancel(self):
- if not self.cancelled:
- self.cancelled = True
+ if self.active:
+ self.active = False
self._f(self)
class DownloadNode:
def fetch_failed(self, sf, f):
assert sf is self._active_segment
- self._active_segment = None
# deliver error upwards
for (d,c) in self._extract_requests(sf.segnum):
eventually(self._deliver, d, c, f)
+ self._active_segment = None
+ self._start_new_segment()
def process_blocks(self, segnum, blocks):
d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
def _deliver(self, d, c, result):
# this method exists to handle cancel() that occurs between
# _got_segment and _deliver
- if not c.cancelled:
+ if c.active:
+ c.active = False # it is now too late to cancel
d.callback(result) # might actually be an errback
def _extract_requests(self, segnum):
self._segment_requests = [t for t in self._segment_requests
if t[2] != c]
segnums = [segnum for (segnum,d,c) in self._segment_requests]
- if self._active_segment.segnum not in segnums:
+ # self._active_segment might be None in rare circumstances, so make
+ # sure we tolerate it
+ if self._active_segment and self._active_segment.segnum not in segnums:
self._active_segment.stop()
self._active_segment = None
self._start_new_segment()
d.addCallback(_done)
return d
+ def test_simultaneous_onefails_onecancelled(self):
+ # This exercises an mplayer behavior in ticket #1154. I believe that
+ # mplayer made two simultaneous webapi GET requests: first one for an
+ # index region at the end of the (mp3/video) file, then one for the
+ # first block of the file (the order doesn't really matter). All GETs
+ # failed (NoSharesError) because of the type(__len__)==long bug. Each
+ # GET submitted a DownloadNode.get_segment() request, which was
+ # queued by the DN (DN._segment_requests), so the second one was
+ # blocked waiting on the first one. When the first one failed,
+ # DN.fetch_failed() was invoked, which errbacks the first GET, but
+ # left the other one hanging (the lost-progress bug mentioned in
+ # #1154 comment 10)
+ #
+ # Then mplayer sees that the index region GET failed, so it cancels
+ # the first-block GET (by closing the HTTP request), triggering
+ # stopProducer. The second GET was waiting in the Deferred (between
+ # n.get_segment() and self._request_retired), so its
+ # _cancel_segment_request was active, so was invoked. However,
+ # DN._active_segment was None since it was not working on any segment
+ # at that time, hence the error in #1154.
+
+ self.basedir = self.mktemp()
+ self.set_up_grid()
+ self.c0 = self.g.clients[0]
+
+ # upload a file with multiple segments, so we can catch the download
+ # in the middle. Tell the downloader, so it can guess correctly.
+ u = upload.Data(plaintext, None)
+ u.max_segment_size = 70 # 5 segs
+ d = self.c0.upload(u)
+ def _uploaded(ur):
+ # corrupt all the shares so the download will fail
+ def _corruptor(s, debug=False):
+ which = 48 # first byte of block0
+ return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+ self.corrupt_all_shares(ur.uri, _corruptor)
+ n = self.c0.create_node_from_uri(ur.uri)
+ n._cnode._maybe_create_download_node()
+ n._cnode._node._build_guessed_tables(u.max_segment_size)
+ con1 = MemoryConsumer()
+ con2 = MemoryConsumer()
+ d = n.read(con1, 0L, 20)
+ d2 = n.read(con2, 140L, 20)
+ # con2 will be cancelled, so d2 should fail with DownloadStopped
+ def _con2_should_not_succeed(res):
+ self.fail("the second read should not have succeeded")
+ def _con2_failed(f):
+ self.failUnless(f.check(DownloadStopped))
+ d2.addCallbacks(_con2_should_not_succeed, _con2_failed)
+
+ def _con1_should_not_succeed(res):
+ self.fail("the first read should not have succeeded")
+ def _con1_failed(f):
+ self.failUnless(f.check(NotEnoughSharesError))
+ con2.producer.stopProducing()
+ return d2
+ d.addCallbacks(_con1_should_not_succeed, _con1_failed)
+ return d
+ d.addCallback(_uploaded)
+ return d
+
+ def test_simultaneous_onefails(self):
+ self.basedir = self.mktemp()
+ self.set_up_grid()
+ self.c0 = self.g.clients[0]
+
+ # upload a file with multiple segments, so we can catch the download
+ # in the middle. Tell the downloader, so it can guess correctly.
+ u = upload.Data(plaintext, None)
+ u.max_segment_size = 70 # 5 segs
+ d = self.c0.upload(u)
+ def _uploaded(ur):
+ # corrupt all the shares so the download will fail
+ def _corruptor(s, debug=False):
+ which = 48 # first byte of block0
+ return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+ self.corrupt_all_shares(ur.uri, _corruptor)
+ n = self.c0.create_node_from_uri(ur.uri)
+ n._cnode._maybe_create_download_node()
+ n._cnode._node._build_guessed_tables(u.max_segment_size)
+ con1 = MemoryConsumer()
+ con2 = MemoryConsumer()
+ d = n.read(con1, 0L, 20)
+ d2 = n.read(con2, 140L, 20)
+ # con2 should wait for con1 to fail and then con2 should succeed.
+ # In particular, we should not lose progress. If this test fails,
+ # it will fail with a timeout error.
+ def _con2_should_succeed(res):
+ # this should succeed because we only corrupted the first
+ # segment of each share. The segment that holds [140:160] is
+ # fine, as are the hash chains and UEB.
+ self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
+ d2.addCallback(_con2_should_succeed)
+
+ def _con1_should_not_succeed(res):
+ self.fail("the first read should not have succeeded")
+ def _con1_failed(f):
+ self.failUnless(f.check(NotEnoughSharesError))
+ # we *don't* cancel the second one here: this exercises a
+ # lost-progress bug from #1154. We just wait for it to
+ # succeed.
+ return d2
+ d.addCallbacks(_con1_should_not_succeed, _con1_failed)
+ return d
+ d.addCallback(_uploaded)
+ return d
+
def test_download_no_overrun(self):
self.basedir = self.mktemp()
self.set_up_grid()
return d
def test_stop(self):
- # use a download targetthat does an immediate stop (ticket #473)
+ # use a download target that stops after the first segment (#473)
self.basedir = self.mktemp()
self.set_up_grid()
self.c0 = self.g.clients[0]
n.read, c)
return d
+ def test_stop_immediately(self):
+ # and a target that stops right after registerProducer (maybe #1154)
+ self.basedir = self.mktemp()
+ self.set_up_grid()
+ self.c0 = self.g.clients[0]
+ self.load_shares()
+ n = self.c0.create_node_from_uri(immutable_uri)
+
+ c = ImmediatelyStoppingConsumer() # stops after registerProducer
+ d = self.shouldFail(DownloadStopped, "test_stop_immediately",
+ "our Consumer called stopProducing()",
+ n.read, c)
+ return d
+
+ def test_stop_immediately2(self):
+ # and a target that stops right after registerProducer (maybe #1154)
+ self.basedir = self.mktemp()
+ self.set_up_grid()
+ self.c0 = self.g.clients[0]
+ self.load_shares()
+ n = self.c0.create_node_from_uri(immutable_uri)
+
+ c = MemoryConsumer()
+ d0 = n.read(c)
+ c.producer.stopProducing()
+ d = self.shouldFail(DownloadStopped, "test_stop_immediately",
+ "our Consumer called stopProducing()",
+ lambda: d0)
+ return d
+
def test_download_segment_bad_ciphertext_hash(self):
# The crypttext_hash_tree asserts the integrity of the decoded
# ciphertext, and exists to detect two sorts of problems. The first
def write(self, data):
self.producer.stopProducing()
+class ImmediatelyStoppingConsumer(MemoryConsumer):
+ def registerProducer(self, p, streaming):
+ MemoryConsumer.registerProducer(self, p, streaming)
+ self.producer.stopProducing()
+
class StallingConsumer(MemoryConsumer):
def __init__(self, halfway_cb):
MemoryConsumer.__init__(self)