]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_download.py
move DownloadStopped from download.common to interfaces
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_download.py
index a4743d2eb6c62a8e3038d850342007d68da80c91..30485b4a020da0ad8b0a23eeba5c2a9b296174fa 100644 (file)
@@ -5,11 +5,22 @@
 
 import os
 from twisted.trial import unittest
+from twisted.internet import defer, reactor
 from allmydata import uri
 from allmydata.storage.server import storage_index_to_dir
-from allmydata.util import base32, fileutil
-from allmydata.immutable import upload
-from allmydata.test.no_network import GridTestMixin
+from allmydata.util import base32, fileutil, spans, log, hashutil
+from allmydata.util.consumer import download_to_data, MemoryConsumer
+from allmydata.immutable import upload, layout
+from allmydata.test.no_network import GridTestMixin, NoNetworkServer
+from allmydata.test.common import ShouldFailMixin
+from allmydata.interfaces import NotEnoughSharesError, NoSharesError, \
+     DownloadStopped
+from allmydata.immutable.downloader.common import BadSegmentNumberError, \
+     BadCiphertextHashError, COMPLETE, OVERDUE, DEAD
+from allmydata.immutable.downloader.status import DownloadStatus
+from allmydata.immutable.downloader.fetcher import SegmentFetcher
+from allmydata.codec import CRSDecoder
+from foolscap.eventual import fireEventually, flushEventualQueue
 
 plaintext = "This is a moderate-sized file.\n" * 10
 mutable_plaintext = "This is a moderate-sized mutable file.\n" * 10
@@ -67,20 +78,7 @@ mutable_shares = {
 }
 #--------- END stored_shares.py ----------------
 
-class DownloadTest(GridTestMixin, unittest.TestCase):
-    timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
-    def test_download(self):
-        self.basedir = self.mktemp()
-        self.set_up_grid()
-        self.c0 = self.g.clients[0]
-
-        # do this to create the shares
-        #return self.create_shares()
-
-        self.load_shares()
-        d = self.download_immutable()
-        d.addCallback(self.download_mutable)
-        return d
+class _Base(GridTestMixin, ShouldFailMixin):
 
     def create_shares(self, ignored=None):
         u = upload.Data(plaintext, None)
@@ -173,10 +171,13 @@ class DownloadTest(GridTestMixin, unittest.TestCase):
 
     def download_immutable(self, ignored=None):
         n = self.c0.create_node_from_uri(immutable_uri)
-        d = n.download_to_data()
+        d = download_to_data(n)
         def _got_data(data):
             self.failUnlessEqual(data, plaintext)
         d.addCallback(_got_data)
+        # make sure we can use the same node twice
+        d.addCallback(lambda ign: download_to_data(n))
+        d.addCallback(_got_data)
         return d
 
     def download_mutable(self, ignored=None):
@@ -187,3 +188,1430 @@ class DownloadTest(GridTestMixin, unittest.TestCase):
         d.addCallback(_got_data)
         return d
 
+class DownloadTest(_Base, unittest.TestCase):
+    timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
+    def test_download(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # do this to create the shares
+        #return self.create_shares()
+
+        self.load_shares()
+        d = self.download_immutable()
+        d.addCallback(self.download_mutable)
+        return d
+
+    def test_download_failover(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        self.load_shares()
+        si = uri.from_string(immutable_uri).get_storage_index()
+        si_dir = storage_index_to_dir(si)
+
+        n = self.c0.create_node_from_uri(immutable_uri)
+        d = download_to_data(n)
+        def _got_data(data):
+            self.failUnlessEqual(data, plaintext)
+        d.addCallback(_got_data)
+
+        def _clobber_some_shares(ign):
+            # find the three shares that were used, and delete them. Then
+            # download again, forcing the downloader to fail over to other
+            # shares
+            for s in n._cnode._node._shares:
+                for clientnum in immutable_shares:
+                    for shnum in immutable_shares[clientnum]:
+                        if s._shnum == shnum:
+                            fn = os.path.join(self.get_serverdir(clientnum),
+                                              "shares", si_dir, str(shnum))
+                            os.unlink(fn)
+        d.addCallback(_clobber_some_shares)
+        d.addCallback(lambda ign: download_to_data(n))
+        d.addCallback(_got_data)
+
+        def _clobber_most_shares(ign):
+            # delete all but one of the shares that are still alive
+            live_shares = [s for s in n._cnode._node._shares if s.is_alive()]
+            save_me = live_shares[0]._shnum
+            for clientnum in immutable_shares:
+                for shnum in immutable_shares[clientnum]:
+                    if shnum == save_me:
+                        continue
+                    fn = os.path.join(self.get_serverdir(clientnum),
+                                      "shares", si_dir, str(shnum))
+                    if os.path.exists(fn):
+                        os.unlink(fn)
+            # now the download should fail with NotEnoughSharesError
+            return self.shouldFail(NotEnoughSharesError, "1shares", None,
+                                   download_to_data, n)
+        d.addCallback(_clobber_most_shares)
+
+        def _clobber_all_shares(ign):
+            # delete the last remaining share
+            for clientnum in immutable_shares:
+                for shnum in immutable_shares[clientnum]:
+                    fn = os.path.join(self.get_serverdir(clientnum),
+                                      "shares", si_dir, str(shnum))
+                    if os.path.exists(fn):
+                        os.unlink(fn)
+            # now a new download should fail with NoSharesError. We want a
+            # new ImmutableFileNode so it will forget about the old shares.
+            # If we merely called create_node_from_uri() without first
+            # dereferencing the original node, the NodeMaker's _node_cache
+            # would give us back the old one.
+            n = None
+            n = self.c0.create_node_from_uri(immutable_uri)
+            return self.shouldFail(NoSharesError, "0shares", None,
+                                   download_to_data, n)
+        d.addCallback(_clobber_all_shares)
+        return d
+
+    def test_lost_servers(self):
+        # while downloading a file (after seg[0], before seg[1]), lose the
+        # three servers that we were using. The download should switch over
+        # to other servers.
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, so we can catch the download
+        # in the middle.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            self.uri = ur.uri
+            self.n = self.c0.create_node_from_uri(self.uri)
+            return download_to_data(self.n)
+        d.addCallback(_uploaded)
+        def _got_data(data):
+            self.failUnlessEqual(data, plaintext)
+        d.addCallback(_got_data)
+        def _kill_some_shares():
+            # find the shares that were used and delete them
+            shares = self.n._cnode._node._shares
+            shnums = sorted([s._shnum for s in shares])
+            self.failUnlessEqual(shnums, [0,1,2,3])
+
+            # break the RIBucketReader references
+            # (we don't break the RIStorageServer references, because that
+            # isn't needed to test the current downloader implementation)
+            for s in shares:
+                s._rref.broken = True
+        def _download_again(ign):
+            # download again, deleting some shares after the first write
+            # to the consumer
+            c = StallingConsumer(_kill_some_shares)
+            return self.n.read(c)
+        d.addCallback(_download_again)
+        def _check_failover(c):
+            self.failUnlessEqual("".join(c.chunks), plaintext)
+            shares = self.n._cnode._node._shares
+            shnums = sorted([s._shnum for s in shares])
+            self.failIfEqual(shnums, [0,1,2,3])
+        d.addCallback(_check_failover)
+        return d
+
+    def test_long_offset(self):
+        # bug #1154: mplayer doing a seek-to-end results in an offset of type
+        # 'long', rather than 'int', and apparently __len__ is required to
+        # return an int. Rewrote Spans/DataSpans to provide s.len() instead
+        # of len(s) .
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+
+        c = MemoryConsumer()
+        d = n.read(c, 0L, 10L)
+        d.addCallback(lambda c: len("".join(c.chunks)))
+        d.addCallback(lambda size: self.failUnlessEqual(size, 10))
+        return d
+
+    def test_badguess(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+
+        # Cause the downloader to guess a segsize that's too low, so it will
+        # ask for a segment number that's too high (beyond the end of the
+        # real list, causing BadSegmentNumberError), to exercise
+        # Segmentation._retry_bad_segment
+        n._cnode._maybe_create_download_node()
+        n._cnode._node._build_guessed_tables(90)
+
+        con1 = MemoryConsumer()
+        # plaintext size of 310 bytes, wrong-segsize of 90 bytes, will make
+        # us think that file[180:200] is in the third segment (segnum=2), but
+        # really there's only one segment
+        d = n.read(con1, 180, 20)
+        def _done(res):
+            self.failUnlessEqual("".join(con1.chunks), plaintext[180:200])
+        d.addCallback(_done)
+        return d
+
+    def test_simultaneous_badguess(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, and a non-default segsize, to
+        # exercise the offset-guessing code. Because we don't tell the
+        # downloader about the unusual segsize, it will guess wrong, and have
+        # to do extra roundtrips to get the correct data.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs, 8-wide hashtree
+        con1 = MemoryConsumer()
+        con2 = MemoryConsumer()
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            n = self.c0.create_node_from_uri(ur.uri)
+            d1 = n.read(con1, 70, 20)
+            d2 = n.read(con2, 140, 20)
+            return defer.gatherResults([d1,d2])
+        d.addCallback(_uploaded)
+        def _done(res):
+            self.failUnlessEqual("".join(con1.chunks), plaintext[70:90])
+            self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
+        d.addCallback(_done)
+        return d
+
+    def test_simultaneous_goodguess(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, and a non-default segsize, to
+        # exercise the offset-guessing code. This time we *do* tell the
+        # downloader about the unusual segsize, so it can guess right.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs, 8-wide hashtree
+        con1 = MemoryConsumer()
+        con2 = MemoryConsumer()
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+            d1 = n.read(con1, 70, 20)
+            d2 = n.read(con2, 140, 20)
+            return defer.gatherResults([d1,d2])
+        d.addCallback(_uploaded)
+        def _done(res):
+            self.failUnlessEqual("".join(con1.chunks), plaintext[70:90])
+            self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
+        d.addCallback(_done)
+        return d
+
+    def test_sequential_goodguess(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        data = (plaintext*100)[:30000] # multiple of k
+
+        # upload a file with multiple segments, and a non-default segsize, to
+        # exercise the offset-guessing code. This time we *do* tell the
+        # downloader about the unusual segsize, so it can guess right.
+        u = upload.Data(data, None)
+        u.max_segment_size = 6000 # 5 segs, 8-wide hashtree
+        con1 = MemoryConsumer()
+        con2 = MemoryConsumer()
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+            d = n.read(con1, 12000, 20)
+            def _read1(ign):
+                self.failUnlessEqual("".join(con1.chunks), data[12000:12020])
+                return n.read(con2, 24000, 20)
+            d.addCallback(_read1)
+            def _read2(ign):
+                self.failUnlessEqual("".join(con2.chunks), data[24000:24020])
+            d.addCallback(_read2)
+            return d
+        d.addCallback(_uploaded)
+        return d
+
+
+    def test_simultaneous_get_blocks(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        self.load_shares()
+        stay_empty = []
+
+        n = self.c0.create_node_from_uri(immutable_uri)
+        d = download_to_data(n)
+        def _use_shares(ign):
+            shares = list(n._cnode._node._shares)
+            s0 = shares[0]
+            # make sure .cancel works too
+            o0 = s0.get_block(0)
+            o0.subscribe(lambda **kwargs: stay_empty.append(kwargs))
+            o1 = s0.get_block(0)
+            o2 = s0.get_block(0)
+            o0.cancel()
+            o3 = s0.get_block(1) # state=BADSEGNUM
+            d1 = defer.Deferred()
+            d2 = defer.Deferred()
+            d3 = defer.Deferred()
+            o1.subscribe(lambda **kwargs: d1.callback(kwargs))
+            o2.subscribe(lambda **kwargs: d2.callback(kwargs))
+            o3.subscribe(lambda **kwargs: d3.callback(kwargs))
+            return defer.gatherResults([d1,d2,d3])
+        d.addCallback(_use_shares)
+        def _done(res):
+            r1,r2,r3 = res
+            self.failUnlessEqual(r1["state"], "COMPLETE")
+            self.failUnlessEqual(r2["state"], "COMPLETE")
+            self.failUnlessEqual(r3["state"], "BADSEGNUM")
+            self.failUnless("block" in r1)
+            self.failUnless("block" in r2)
+            self.failIf(stay_empty)
+        d.addCallback(_done)
+        return d
+
+    def test_simultaneous_onefails_onecancelled(self):
+        # This exercises an mplayer behavior in ticket #1154. I believe that
+        # mplayer made two simultaneous webapi GET requests: first one for an
+        # index region at the end of the (mp3/video) file, then one for the
+        # first block of the file (the order doesn't really matter). All GETs
+        # failed (NoSharesError) because of the type(__len__)==long bug. Each
+        # GET submitted a DownloadNode.get_segment() request, which was
+        # queued by the DN (DN._segment_requests), so the second one was
+        # blocked waiting on the first one. When the first one failed,
+        # DN.fetch_failed() was invoked, which errbacks the first GET, but
+        # left the other one hanging (the lost-progress bug mentioned in
+        # #1154 comment 10)
+        #
+        # Then mplayer sees that the index region GET failed, so it cancels
+        # the first-block GET (by closing the HTTP request), triggering
+        # stopProducer. The second GET was waiting in the Deferred (between
+        # n.get_segment() and self._request_retired), so its
+        # _cancel_segment_request was active, so was invoked. However,
+        # DN._active_segment was None since it was not working on any segment
+        # at that time, hence the error in #1154.
+
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, so we can catch the download
+        # in the middle. Tell the downloader, so it can guess correctly.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            # corrupt all the shares so the download will fail
+            def _corruptor(s, debug=False):
+                which = 48 # first byte of block0
+                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+            self.corrupt_all_shares(ur.uri, _corruptor)
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+            con1 = MemoryConsumer()
+            con2 = MemoryConsumer()
+            d = n.read(con1, 0L, 20)
+            d2 = n.read(con2, 140L, 20)
+            # con2 will be cancelled, so d2 should fail with DownloadStopped
+            def _con2_should_not_succeed(res):
+                self.fail("the second read should not have succeeded")
+            def _con2_failed(f):
+                self.failUnless(f.check(DownloadStopped))
+            d2.addCallbacks(_con2_should_not_succeed, _con2_failed)
+
+            def _con1_should_not_succeed(res):
+                self.fail("the first read should not have succeeded")
+            def _con1_failed(f):
+                self.failUnless(f.check(NoSharesError))
+                con2.producer.stopProducing()
+                return d2
+            d.addCallbacks(_con1_should_not_succeed, _con1_failed)
+            return d
+        d.addCallback(_uploaded)
+        return d
+
+    def test_simultaneous_onefails(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, so we can catch the download
+        # in the middle. Tell the downloader, so it can guess correctly.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            # corrupt all the shares so the download will fail
+            def _corruptor(s, debug=False):
+                which = 48 # first byte of block0
+                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+            self.corrupt_all_shares(ur.uri, _corruptor)
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+            con1 = MemoryConsumer()
+            con2 = MemoryConsumer()
+            d = n.read(con1, 0L, 20)
+            d2 = n.read(con2, 140L, 20)
+            # con2 should wait for con1 to fail and then con2 should succeed.
+            # In particular, we should not lose progress. If this test fails,
+            # it will fail with a timeout error.
+            def _con2_should_succeed(res):
+                # this should succeed because we only corrupted the first
+                # segment of each share. The segment that holds [140:160] is
+                # fine, as are the hash chains and UEB.
+                self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
+            d2.addCallback(_con2_should_succeed)
+
+            def _con1_should_not_succeed(res):
+                self.fail("the first read should not have succeeded")
+            def _con1_failed(f):
+                self.failUnless(f.check(NoSharesError))
+                # we *don't* cancel the second one here: this exercises a
+                # lost-progress bug from #1154. We just wait for it to
+                # succeed.
+                return d2
+            d.addCallbacks(_con1_should_not_succeed, _con1_failed)
+            return d
+        d.addCallback(_uploaded)
+        return d
+
+    def test_download_no_overrun(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        self.load_shares()
+
+        # tweak the client's copies of server-version data, so it believes
+        # that they're old and can't handle reads that overrun the length of
+        # the share. This exercises a different code path.
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
+            v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
+            v1["tolerates-immutable-read-overrun"] = False
+
+        n = self.c0.create_node_from_uri(immutable_uri)
+        d = download_to_data(n)
+        def _got_data(data):
+            self.failUnlessEqual(data, plaintext)
+        d.addCallback(_got_data)
+        return d
+
+    def test_download_segment(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+        cn = n._cnode
+        (d,c) = cn.get_segment(0)
+        def _got_segment((offset,data,decodetime)):
+            self.failUnlessEqual(offset, 0)
+            self.failUnlessEqual(len(data), len(plaintext))
+        d.addCallback(_got_segment)
+        return d
+
+    def test_download_segment_cancel(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+        cn = n._cnode
+        (d,c) = cn.get_segment(0)
+        fired = []
+        d.addCallback(fired.append)
+        c.cancel()
+        d = fireEventually()
+        d.addCallback(flushEventualQueue)
+        def _check(ign):
+            self.failUnlessEqual(fired, [])
+        d.addCallback(_check)
+        return d
+
+    def test_download_bad_segment(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+        cn = n._cnode
+        def _try_download():
+            (d,c) = cn.get_segment(1)
+            return d
+        d = self.shouldFail(BadSegmentNumberError, "badseg",
+                            "segnum=1, numsegs=1",
+                            _try_download)
+        return d
+
+    def test_download_segment_terminate(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+        cn = n._cnode
+        (d,c) = cn.get_segment(0)
+        fired = []
+        d.addCallback(fired.append)
+        self.c0.terminator.disownServiceParent()
+        d = fireEventually()
+        d.addCallback(flushEventualQueue)
+        def _check(ign):
+            self.failUnlessEqual(fired, [])
+        d.addCallback(_check)
+        return d
+
+    def test_pause(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+        c = PausingConsumer()
+        d = n.read(c)
+        def _downloaded(mc):
+            newdata = "".join(mc.chunks)
+            self.failUnlessEqual(newdata, plaintext)
+        d.addCallback(_downloaded)
+        return d
+
+    def test_pause_then_stop(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+        c = PausingAndStoppingConsumer()
+        d = self.shouldFail(DownloadStopped, "test_pause_then_stop",
+                            "our Consumer called stopProducing()",
+                            n.read, c)
+        return d
+
+    def test_stop(self):
+        # use a download target that stops after the first segment (#473)
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+        c = StoppingConsumer()
+        d = self.shouldFail(DownloadStopped, "test_stop",
+                            "our Consumer called stopProducing()",
+                            n.read, c)
+        return d
+
+    def test_stop_immediately(self):
+        # and a target that stops right after registerProducer (maybe #1154)
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+
+        c = ImmediatelyStoppingConsumer() # stops after registerProducer
+        d = self.shouldFail(DownloadStopped, "test_stop_immediately",
+                            "our Consumer called stopProducing()",
+                            n.read, c)
+        return d
+
+    def test_stop_immediately2(self):
+        # and a target that stops right after registerProducer (maybe #1154)
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+
+        c = MemoryConsumer()
+        d0 = n.read(c)
+        c.producer.stopProducing()
+        d = self.shouldFail(DownloadStopped, "test_stop_immediately",
+                            "our Consumer called stopProducing()",
+                            lambda: d0)
+        return d
+
+    def test_download_segment_bad_ciphertext_hash(self):
+        # The crypttext_hash_tree asserts the integrity of the decoded
+        # ciphertext, and exists to detect two sorts of problems. The first
+        # is a bug in zfec decode. The second is the "two-sided t-shirt"
+        # attack (found by Christian Grothoff), in which a malicious uploader
+        # creates two sets of shares (one for file A, second for file B),
+        # uploads a combination of them (shares 0-4 of A, 5-9 of B), and then
+        # builds an otherwise normal UEB around those shares: their goal is
+        # to give their victim a filecap which sometimes downloads the good A
+        # contents, and sometimes the bad B contents, depending upon which
+        # servers/shares they can get to. Having a hash of the ciphertext
+        # forces them to commit to exactly one version. (Christian's prize
+        # for finding this problem was a t-shirt with two sides: the shares
+        # of file A on the front, B on the back).
+
+        # creating a set of shares with this property is too hard, although
+        # it'd be nice to do so and confirm our fix. (it requires a lot of
+        # tampering with the uploader). So instead, we just damage the
+        # decoder. The tail decoder is rebuilt each time, so we need to use a
+        # file with multiple segments.
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 60 # 6 segs
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+
+            d = download_to_data(n)
+            def _break_codec(data):
+                # the codec isn't created until the UEB is retrieved
+                node = n._cnode._node
+                vcap = node._verifycap
+                k, N = vcap.needed_shares, vcap.total_shares
+                bad_codec = BrokenDecoder()
+                bad_codec.set_params(node.segment_size, k, N)
+                node._codec = bad_codec
+            d.addCallback(_break_codec)
+            # now try to download it again. The broken codec will provide
+            # ciphertext that fails the hash test.
+            d.addCallback(lambda ign:
+                          self.shouldFail(BadCiphertextHashError, "badhash",
+                                          "hash failure in "
+                                          "ciphertext_hash_tree: segnum=0",
+                                          download_to_data, n))
+            return d
+        d.addCallback(_uploaded)
+        return d
+
+    def OFFtest_download_segment_XXX(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, and a non-default segsize, to
+        # exercise the offset-guessing code. This time we *do* tell the
+        # downloader about the unusual segsize, so it can guess right.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs, 8-wide hashtree
+        con1 = MemoryConsumer()
+        con2 = MemoryConsumer()
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+            d1 = n.read(con1, 70, 20)
+            #d2 = n.read(con2, 140, 20)
+            d2 = defer.succeed(None)
+            return defer.gatherResults([d1,d2])
+        d.addCallback(_uploaded)
+        def _done(res):
+            self.failUnlessEqual("".join(con1.chunks), plaintext[70:90])
+            self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
+        #d.addCallback(_done)
+        return d
+
+    def test_duplicate_shares(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        self.load_shares()
+        # make sure everybody has a copy of sh0. The second server contacted
+        # will report two shares, and the ShareFinder will handle the
+        # duplicate by attaching both to the same CommonShare instance.
+        si = uri.from_string(immutable_uri).get_storage_index()
+        si_dir = storage_index_to_dir(si)
+        sh0_file = [sharefile
+                    for (shnum, serverid, sharefile)
+                    in self.find_uri_shares(immutable_uri)
+                    if shnum == 0][0]
+        sh0_data = open(sh0_file, "rb").read()
+        for clientnum in immutable_shares:
+            if 0 in immutable_shares[clientnum]:
+                continue
+            cdir = self.get_serverdir(clientnum)
+            target = os.path.join(cdir, "shares", si_dir, "0")
+            outf = open(target, "wb")
+            outf.write(sh0_data)
+            outf.close()
+
+        d = self.download_immutable()
+        return d
+
+    def test_verifycap(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+
+        n = self.c0.create_node_from_uri(immutable_uri)
+        vcap = n.get_verify_cap().to_string()
+        vn = self.c0.create_node_from_uri(vcap)
+        d = download_to_data(vn)
+        def _got_ciphertext(ciphertext):
+            self.failUnlessEqual(len(ciphertext), len(plaintext))
+            self.failIfEqual(ciphertext, plaintext)
+        d.addCallback(_got_ciphertext)
+        return d
+
+class BrokenDecoder(CRSDecoder):
+    def decode(self, shares, shareids):
+        d = CRSDecoder.decode(self, shares, shareids)
+        def _decoded(buffers):
+            def _corruptor(s, which):
+                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+            buffers[0] = _corruptor(buffers[0], 0) # flip lsb of first byte
+            return buffers
+        d.addCallback(_decoded)
+        return d
+
+
+class PausingConsumer(MemoryConsumer):
+    def __init__(self):
+        MemoryConsumer.__init__(self)
+        self.size = 0
+        self.writes = 0
+    def write(self, data):
+        self.size += len(data)
+        self.writes += 1
+        if self.writes <= 2:
+            # we happen to use 4 segments, and want to avoid pausing on the
+            # last one (since then the _unpause timer will still be running)
+            self.producer.pauseProducing()
+            reactor.callLater(0.1, self._unpause)
+        return MemoryConsumer.write(self, data)
+    def _unpause(self):
+        self.producer.resumeProducing()
+
+class PausingAndStoppingConsumer(PausingConsumer):
+    def write(self, data):
+        self.producer.pauseProducing()
+        reactor.callLater(0.5, self._stop)
+    def _stop(self):
+        self.producer.stopProducing()
+
+class StoppingConsumer(PausingConsumer):
+    def write(self, data):
+        self.producer.stopProducing()
+
+class ImmediatelyStoppingConsumer(MemoryConsumer):
+    def registerProducer(self, p, streaming):
+        MemoryConsumer.registerProducer(self, p, streaming)
+        self.producer.stopProducing()
+
+class StallingConsumer(MemoryConsumer):
+    def __init__(self, halfway_cb):
+        MemoryConsumer.__init__(self)
+        self.halfway_cb = halfway_cb
+        self.writes = 0
+    def write(self, data):
+        self.writes += 1
+        if self.writes == 1:
+            self.halfway_cb()
+        return MemoryConsumer.write(self, data)
+
+class Corruption(_Base, unittest.TestCase):
+
+    def _corrupt_flip(self, ign, imm_uri, which):
+        log.msg("corrupt %d" % which)
+        def _corruptor(s, debug=False):
+            return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+        self.corrupt_shares_numbered(imm_uri, [0], _corruptor)
+
+    def _corrupt_set(self, ign, imm_uri, which, newvalue):
+        log.msg("corrupt %d" % which)
+        def _corruptor(s, debug=False):
+            return s[:which] + chr(newvalue) + s[which+1:]
+        self.corrupt_shares_numbered(imm_uri, [0], _corruptor)
+
+    def test_each_byte(self):
+        # Setting catalog_detection=True performs an exhaustive test of the
+        # Downloader's response to corruption in the lsb of each byte of the
+        # 2070-byte share, with two goals: make sure we tolerate all forms of
+        # corruption (i.e. don't hang or return bad data), and make a list of
+        # which bytes can be corrupted without influencing the download
+        # (since we don't need every byte of the share). That takes 50s to
+        # run on my laptop and doesn't have any actual asserts, so we don't
+        # normally do that.
+        self.catalog_detection = False
+
+        self.basedir = "download/Corruption/each_byte"
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # to exercise the block-hash-tree code properly, we need to have
+        # multiple segments. We don't tell the downloader about the different
+        # segsize, so it guesses wrong and must do extra roundtrips.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 120 # 3 segs, 4-wide hashtree
+
+        if self.catalog_detection:
+            undetected = spans.Spans()
+
+        def _download(ign, imm_uri, which, expected):
+            n = self.c0.create_node_from_uri(imm_uri)
+            n._cnode._maybe_create_download_node()
+            # for this test to work, we need to have a new Node each time.
+            # Make sure the NodeMaker's weakcache hasn't interfered.
+            assert not n._cnode._node._shares
+            d = download_to_data(n)
+            def _got_data(data):
+                self.failUnlessEqual(data, plaintext)
+                shnums = sorted([s._shnum for s in n._cnode._node._shares])
+                no_sh0 = bool(0 not in shnums)
+                sh0 = [s for s in n._cnode._node._shares if s._shnum == 0]
+                sh0_had_corruption = False
+                if sh0 and sh0[0].had_corruption:
+                    sh0_had_corruption = True
+                num_needed = len(n._cnode._node._shares)
+                if self.catalog_detection:
+                    detected = no_sh0 or sh0_had_corruption or (num_needed!=3)
+                    if not detected:
+                        undetected.add(which, 1)
+                if expected == "no-sh0":
+                    self.failIfIn(0, shnums)
+                elif expected == "0bad-need-3":
+                    self.failIf(no_sh0)
+                    self.failUnless(sh0[0].had_corruption)
+                    self.failUnlessEqual(num_needed, 3)
+                elif expected == "need-4th":
+                    self.failIf(no_sh0)
+                    self.failUnless(sh0[0].had_corruption)
+                    self.failIfEqual(num_needed, 3)
+            d.addCallback(_got_data)
+            return d
+
+
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            imm_uri = ur.uri
+            self.shares = self.copy_shares(imm_uri)
+            d = defer.succeed(None)
+            # 'victims' is a list of corruption tests to run. Each one flips
+            # the low-order bit of the specified offset in the share file (so
+            # offset=0 is the MSB of the container version, offset=15 is the
+            # LSB of the share version, offset=24 is the MSB of the
+            # data-block-offset, and offset=48 is the first byte of the first
+            # data-block). Each one also specifies what sort of corruption
+            # we're expecting to see.
+            no_sh0_victims = [0,1,2,3] # container version
+            need3_victims =  [ ] # none currently in this category
+            # when the offsets are corrupted, the Share will be unable to
+            # retrieve the data it wants (because it thinks that data lives
+            # off in the weeds somewhere), and Share treats DataUnavailable
+            # as abandon-this-share, so in general we'll be forced to look
+            # for a 4th share.
+            need_4th_victims = [12,13,14,15, # share version
+                                24,25,26,27, # offset[data]
+                                32,33,34,35, # offset[crypttext_hash_tree]
+                                36,37,38,39, # offset[block_hashes]
+                                44,45,46,47, # offset[UEB]
+                                ]
+            need_4th_victims.append(48) # block data
+            # when corrupting hash trees, we must corrupt a value that isn't
+            # directly set from somewhere else. Since we download data from
+            # seg0, corrupt something on its hash chain, like [2] (the
+            # right-hand child of the root)
+            need_4th_victims.append(600+2*32) # block_hashes[2]
+            # Share.loop is pretty conservative: it abandons the share at the
+            # first sign of corruption. It doesn't strictly need to be this
+            # way: if the UEB were corrupt, we could still get good block
+            # data from that share, as long as there was a good copy of the
+            # UEB elsewhere. If this behavior is relaxed, then corruption in
+            # the following fields (which are present in multiple shares)
+            # should fall into the "need3_victims" case instead of the
+            # "need_4th_victims" case.
+            need_4th_victims.append(376+2*32) # crypttext_hash_tree[2]
+            need_4th_victims.append(824) # share_hashes
+            need_4th_victims.append(994) # UEB length
+            need_4th_victims.append(998) # UEB
+            corrupt_me = ([(i,"no-sh0") for i in no_sh0_victims] +
+                          [(i, "0bad-need-3") for i in need3_victims] +
+                          [(i, "need-4th") for i in need_4th_victims])
+            if self.catalog_detection:
+                corrupt_me = [(i, "") for i in range(len(self.sh0_orig))]
+            for i,expected in corrupt_me:
+                # All these tests result in a successful download. What we're
+                # measuring is how many shares the downloader had to use.
+                d.addCallback(self._corrupt_flip, imm_uri, i)
+                d.addCallback(_download, imm_uri, i, expected)
+                d.addCallback(lambda ign: self.restore_all_shares(self.shares))
+                d.addCallback(fireEventually)
+            corrupt_values = [(3, 2, "no-sh0"),
+                              (15, 2, "need-4th"), # share looks v2
+                              ]
+            for i,newvalue,expected in corrupt_values:
+                d.addCallback(self._corrupt_set, imm_uri, i, newvalue)
+                d.addCallback(_download, imm_uri, i, expected)
+                d.addCallback(lambda ign: self.restore_all_shares(self.shares))
+                d.addCallback(fireEventually)
+            return d
+        d.addCallback(_uploaded)
+        def _show_results(ign):
+            print
+            print ("of [0:%d], corruption ignored in %s" %
+                   (len(self.sh0_orig), undetected.dump()))
+        if self.catalog_detection:
+            d.addCallback(_show_results)
+            # of [0:2070], corruption ignored in len=1133:
+            # [4-11],[16-23],[28-31],[152-439],[600-663],[1309-2069]
+            #  [4-11]: container sizes
+            #  [16-23]: share block/data sizes
+            #  [152-375]: plaintext hash tree
+            #  [376-408]: crypttext_hash_tree[0] (root)
+            #  [408-439]: crypttext_hash_tree[1] (computed)
+            #  [600-631]: block hash tree[0] (root)
+            #  [632-663]: block hash tree[1] (computed)
+            #  [1309-]: reserved+unused UEB space
+        return d
+
+    def test_failure(self):
+        # this test corrupts all shares in the same way, and asserts that the
+        # download fails.
+
+        self.basedir = "download/Corruption/failure"
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # to exercise the block-hash-tree code properly, we need to have
+        # multiple segments. We don't tell the downloader about the different
+        # segsize, so it guesses wrong and must do extra roundtrips.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 120 # 3 segs, 4-wide hashtree
+
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            imm_uri = ur.uri
+            self.shares = self.copy_shares(imm_uri)
+
+            corrupt_me = [(48, "block data", "Last failure: None"),
+                          (600+2*32, "block_hashes[2]", "BadHashError"),
+                          (376+2*32, "crypttext_hash_tree[2]", "BadHashError"),
+                          (824, "share_hashes", "BadHashError"),
+                          ]
+            def _download(imm_uri):
+                n = self.c0.create_node_from_uri(imm_uri)
+                n._cnode._maybe_create_download_node()
+                # for this test to work, we need to have a new Node each time.
+                # Make sure the NodeMaker's weakcache hasn't interfered.
+                assert not n._cnode._node._shares
+                return download_to_data(n)
+
+            d = defer.succeed(None)
+            for i,which,substring in corrupt_me:
+                # All these tests result in a failed download.
+                d.addCallback(self._corrupt_flip_all, imm_uri, i)
+                d.addCallback(lambda ign:
+                              self.shouldFail(NoSharesError, which,
+                                              substring,
+                                              _download, imm_uri))
+                d.addCallback(lambda ign: self.restore_all_shares(self.shares))
+                d.addCallback(fireEventually)
+            return d
+        d.addCallback(_uploaded)
+
+        return d
+
+    def _corrupt_flip_all(self, ign, imm_uri, which):
+        def _corruptor(s, debug=False):
+            return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+        self.corrupt_all_shares(imm_uri, _corruptor)
+
+class DownloadV2(_Base, unittest.TestCase):
+    # tests which exercise v2-share code. They first upload a file with
+    # FORCE_V2 set.
+
+    def setUp(self):
+        d = defer.maybeDeferred(_Base.setUp, self)
+        def _set_force_v2(ign):
+            self.old_force_v2 = layout.FORCE_V2
+            layout.FORCE_V2 = True
+        d.addCallback(_set_force_v2)
+        return d
+    def tearDown(self):
+        layout.FORCE_V2 = self.old_force_v2
+        return _Base.tearDown(self)
+
+    def test_download(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file
+        u = upload.Data(plaintext, None)
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            imm_uri = ur.uri
+            n = self.c0.create_node_from_uri(imm_uri)
+            return download_to_data(n)
+        d.addCallback(_uploaded)
+        return d
+
+    def test_download_no_overrun(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # tweak the client's copies of server-version data, so it believes
+        # that they're old and can't handle reads that overrun the length of
+        # the share. This exercises a different code path.
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
+            v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
+            v1["tolerates-immutable-read-overrun"] = False
+
+        # upload a file
+        u = upload.Data(plaintext, None)
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            imm_uri = ur.uri
+            n = self.c0.create_node_from_uri(imm_uri)
+            return download_to_data(n)
+        d.addCallback(_uploaded)
+        return d
+
+    def OFF_test_no_overrun_corrupt_shver(self): # unnecessary
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
+            v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
+            v1["tolerates-immutable-read-overrun"] = False
+
+        # upload a file
+        u = upload.Data(plaintext, None)
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            imm_uri = ur.uri
+            def _do_corrupt(which, newvalue):
+                def _corruptor(s, debug=False):
+                    return s[:which] + chr(newvalue) + s[which+1:]
+                self.corrupt_shares_numbered(imm_uri, [0], _corruptor)
+            _do_corrupt(12+3, 0x00)
+            n = self.c0.create_node_from_uri(imm_uri)
+            d = download_to_data(n)
+            def _got_data(data):
+                self.failUnlessEqual(data, plaintext)
+            d.addCallback(_got_data)
+            return d
+        d.addCallback(_uploaded)
+        return d
+
+class Status(unittest.TestCase):
+    def test_status(self):
+        now = 12345.1
+        ds = DownloadStatus("si-1", 123)
+        self.failUnlessEqual(ds.get_status(), "idle")
+        ev0 = ds.add_segment_request(0, now)
+        self.failUnlessEqual(ds.get_status(), "fetching segment 0")
+        ev0.activate(now+0.5)
+        ev0.deliver(now+1, 0, 1000, 2.0)
+        self.failUnlessEqual(ds.get_status(), "idle")
+        ev2 = ds.add_segment_request(2, now+2)
+        del ev2 # hush pyflakes
+        ev1 = ds.add_segment_request(1, now+2)
+        self.failUnlessEqual(ds.get_status(), "fetching segments 1,2")
+        ev1.error(now+3)
+        self.failUnlessEqual(ds.get_status(),
+                             "fetching segment 2; errors on segment 1")
+
+    def test_progress(self):
+        now = 12345.1
+        ds = DownloadStatus("si-1", 123)
+        self.failUnlessEqual(ds.get_progress(), 0.0)
+        e = ds.add_read_event(0, 1000, now)
+        self.failUnlessEqual(ds.get_progress(), 0.0)
+        e.update(500, 2.0, 2.0)
+        self.failUnlessEqual(ds.get_progress(), 0.5)
+        e.finished(now+2)
+        self.failUnlessEqual(ds.get_progress(), 1.0)
+
+        e1 = ds.add_read_event(1000, 2000, now+3)
+        e2 = ds.add_read_event(4000, 2000, now+3)
+        self.failUnlessEqual(ds.get_progress(), 0.0)
+        e1.update(1000, 2.0, 2.0)
+        self.failUnlessEqual(ds.get_progress(), 0.25)
+        e2.update(1000, 2.0, 2.0)
+        self.failUnlessEqual(ds.get_progress(), 0.5)
+        e1.update(1000, 2.0, 2.0)
+        e1.finished(now+4)
+        # now there is only one outstanding read, and it is 50% done
+        self.failUnlessEqual(ds.get_progress(), 0.5)
+        e2.update(1000, 2.0, 2.0)
+        e2.finished(now+5)
+        self.failUnlessEqual(ds.get_progress(), 1.0)
+
+    def test_active(self):
+        now = 12345.1
+        ds = DownloadStatus("si-1", 123)
+        self.failUnlessEqual(ds.get_active(), False)
+        e1 = ds.add_read_event(0, 1000, now)
+        self.failUnlessEqual(ds.get_active(), True)
+        e2 = ds.add_read_event(1, 1000, now+1)
+        self.failUnlessEqual(ds.get_active(), True)
+        e1.finished(now+2)
+        self.failUnlessEqual(ds.get_active(), True)
+        e2.finished(now+3)
+        self.failUnlessEqual(ds.get_active(), False)
+
+def make_server(clientid):
+    tubid = hashutil.tagged_hash("clientid", clientid)[:20]
+    return NoNetworkServer(tubid, None)
+def make_servers(clientids):
+    servers = {}
+    for clientid in clientids:
+        servers[clientid] = make_server(clientid)
+    return servers
+
+class MyShare:
+    def __init__(self, shnum, server, rtt):
+        self._shnum = shnum
+        self._server = server
+        self._dyhb_rtt = rtt
+    def __repr__(self):
+        return "sh%d-on-%s" % (self._shnum, self._server.get_name())
+
+class MySegmentFetcher(SegmentFetcher):
+    def __init__(self, *args, **kwargs):
+        SegmentFetcher.__init__(self, *args, **kwargs)
+        self._test_start_shares = []
+    def _start_share(self, share, shnum):
+        self._test_start_shares.append(share)
+
+class FakeNode:
+    def __init__(self):
+        self.want_more = 0
+        self.failed = None
+        self.processed = None
+        self._si_prefix = "si_prefix"
+    def want_more_shares(self):
+        self.want_more += 1
+    def fetch_failed(self, fetcher, f):
+        self.failed = f
+    def process_blocks(self, segnum, blocks):
+        self.processed = (segnum, blocks)
+    def get_num_segments(self):
+        return 1, True
+
+class Selection(unittest.TestCase):
+    def test_no_shares(self):
+        node = FakeNode()
+        sf = SegmentFetcher(node, 0, 3, None)
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NoSharesError))
+        d.addCallback(_check2)
+        return d
+
+    def test_only_one_share(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            sname = serverA.get_name()
+            self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused="  % sname,
+                              str(node.failed))
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_avoid_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        servers = make_servers(["peer-A", "peer-B", "peer-C"])
+        shares = [MyShare(0, servers["peer-A"], 0.0),
+                  MyShare(1, servers["peer-A"], 0.0),
+                  MyShare(2, servers["peer-A"], 0.0),
+                  MyShare(3, servers["peer-B"], 1.0),
+                  MyShare(4, servers["peer-C"], 2.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[3], shares[4]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      3: "block-3",
+                                                      4: "block-4"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(node.want_more, 3)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
+                  ]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 2)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_overdue(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {3: "block-3",
+                                                      4: "block-4",
+                                                      5: "block-5"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_overdue_fails(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        servers = make_servers(["peer-%d" % i for i in range(6)])
+        shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)]
+        sf.add_shares(shares)
+        sf.no_more_shares()
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            # we're still waiting
+            self.failUnlessEqual(node.processed, None)
+            self.failUnlessEqual(node.failed, None)
+            # now complete one of the overdue ones, and kill one of the other
+            # ones, leaving one hanging. This should trigger a failure, since
+            # we cannot succeed.
+            live = sf._test_start_shares[0]
+            die = sf._test_start_shares[1]
+            sf._block_request_activity(live, live._shnum, COMPLETE, "block")
+            sf._block_request_activity(die, die._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            sname = servers["peer-2"].get_name()
+            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname,
+                              str(node.failed))
+        d.addCallback(_check4)
+        return d
+
+    def test_avoid_redundancy(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D",
+                                "peer-E"])
+        shares = [MyShare(0, servers["peer-A"],0.0),
+                  MyShare(1, servers["peer-B"],1.0),
+                  MyShare(0, servers["peer-C"],2.0), # this will be skipped
+                  MyShare(1, servers["peer-D"],3.0),
+                  MyShare(2, servers["peer-E"],4.0),
+                  ]
+        sf.add_shares(shares[:3])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1]])
+            # allow sh1 to retire
+            sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            # and then feed in the remaining shares
+            sf.add_shares(shares[3:])
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[4]])
+            sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
+            sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check4)
+        return d