from allmydata.test.common import ShouldFailMixin
from allmydata.interfaces import NotEnoughSharesError, NoSharesError
from allmydata.immutable.downloader.common import BadSegmentNumberError, \
- BadCiphertextHashError, DownloadStopped
+ BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD
from allmydata.immutable.downloader.status import DownloadStatus
+from allmydata.immutable.downloader.fetcher import SegmentFetcher
from allmydata.codec import CRSDecoder
from foolscap.eventual import fireEventually, flushEventualQueue
# shares
servers = []
shares = sorted([s._shnum for s in self.n._cnode._node._shares])
- self.failUnlessEqual(shares, [0,1,2])
+ self.failUnlessEqual(shares, [0,1,2,3])
# break the RIBucketReader references
for s in self.n._cnode._node._shares:
s._rref.broken = True
self.failUnlessEqual("".join(c.chunks), plaintext)
shares = sorted([s._shnum for s in self.n._cnode._node._shares])
# we should now be using more shares than we were before
- self.failIfEqual(shares, [0,1,2])
+ self.failIfEqual(shares, [0,1,2,3])
d.addCallback(_check_failover)
return d
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
- self.failUnless(f.check(NotEnoughSharesError))
+ self.failUnless(f.check(NoSharesError))
con2.producer.stopProducing()
return d2
d.addCallbacks(_con1_should_not_succeed, _con1_failed)
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
- self.failUnless(f.check(NotEnoughSharesError))
+ self.failUnless(f.check(NoSharesError))
# we *don't* cancel the second one here: this exercises a
# lost-progress bug from #1154. We just wait for it to
# succeed.
# All these tests result in a failed download.
d.addCallback(self._corrupt_flip_all, imm_uri, i)
d.addCallback(lambda ign:
- self.shouldFail(NotEnoughSharesError, which,
+ self.shouldFail(NoSharesError, which,
substring,
_download, imm_uri))
d.addCallback(lambda ign: self.restore_all_shares(self.shares))
e2.update(1000, 2.0, 2.0)
e2.finished(now+5)
self.failUnlessEqual(ds.get_progress(), 1.0)
+
+class MyShare:
+ def __init__(self, shnum, peerid, rtt):
+ self._shnum = shnum
+ self._peerid = peerid
+ self._peerid_s = peerid
+ self._dyhb_rtt = rtt
+ def __repr__(self):
+ return "sh%d-on-%s" % (self._shnum, self._peerid)
+
+class MySegmentFetcher(SegmentFetcher):
+ def __init__(self, *args, **kwargs):
+ SegmentFetcher.__init__(self, *args, **kwargs)
+ self._test_start_shares = []
+ def _start_share(self, share, shnum):
+ self._test_start_shares.append(share)
+
+class FakeNode:
+ def __init__(self):
+ self.want_more = 0
+ self.failed = None
+ self.processed = None
+ self._si_prefix = "si_prefix"
+ def want_more_shares(self):
+ self.want_more += 1
+ def fetch_failed(self, fetcher, f):
+ self.failed = f
+ def process_blocks(self, segnum, blocks):
+ self.processed = (segnum, blocks)
+ def get_num_segments(self):
+ return 1, True
+
+class Selection(unittest.TestCase):
+ def test_no_shares(self):
+ node = FakeNode()
+ sf = SegmentFetcher(node, 0, 3, None)
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ self.failUnlessEqual(node.failed, None)
+ sf.no_more_shares()
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnless(node.failed)
+ self.failUnless(node.failed.check(NoSharesError))
+ d.addCallback(_check2)
+ return d
+
+ def test_only_one_share(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(0, "peer-A", 0.0)]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ self.failUnlessEqual(node.failed, None)
+ sf.no_more_shares()
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnless(node.failed)
+ self.failUnless(node.failed.check(NotEnoughSharesError))
+ self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
+ str(node.failed))
+ d.addCallback(_check2)
+ return d
+
+ def test_good_diversity_early(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 0)
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check2)
+ return d
+
+ def test_good_diversity_late(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ sf.add_shares(shares)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_avoid_bad_diversity_late(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we could satisfy the read entirely from the first server, but we'd
+ # prefer not to. Instead, we expect to only pull one share from the
+ # first server
+ shares = [MyShare(0, "peer-A", 0.0),
+ MyShare(1, "peer-A", 0.0),
+ MyShare(2, "peer-A", 0.0),
+ MyShare(3, "peer-B", 1.0),
+ MyShare(4, "peer-C", 2.0),
+ ]
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ sf.add_shares(shares)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[3], shares[4]])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 3: "block-3",
+ 4: "block-4"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_suffer_bad_diversity_late(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we satisfy the read entirely from the first server because we don't
+ # have any other choice.
+ shares = [MyShare(0, "peer-A", 0.0),
+ MyShare(1, "peer-A", 0.0),
+ MyShare(2, "peer-A", 0.0),
+ MyShare(3, "peer-A", 0.0),
+ MyShare(4, "peer-A", 0.0),
+ ]
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ sf.add_shares(shares)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(node.want_more, 3)
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1], shares[2]])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_suffer_bad_diversity_early(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we satisfy the read entirely from the first server because we don't
+ # have any other choice.
+ shares = [MyShare(0, "peer-A", 0.0),
+ MyShare(1, "peer-A", 0.0),
+ MyShare(2, "peer-A", 0.0),
+ MyShare(3, "peer-A", 0.0),
+ MyShare(4, "peer-A", 0.0),
+ ]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 2)
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1], shares[2]])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check2)
+ return d
+
+ def test_overdue(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 0)
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, OVERDUE)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares, shares[:6])
+ for sh in sf._test_start_shares[3:]:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {3: "block-3",
+ 4: "block-4",
+ 5: "block-5"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_overdue_fails(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
+ sf.add_shares(shares)
+ sf.no_more_shares()
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 0)
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, OVERDUE)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares, shares[:6])
+ for sh in sf._test_start_shares[3:]:
+ sf._block_request_activity(sh, sh._shnum, DEAD)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ # we're still waiting
+ self.failUnlessEqual(node.processed, None)
+ self.failUnlessEqual(node.failed, None)
+ # now complete one of the overdue ones, and kill one of the other
+ # ones, leaving one hanging. This should trigger a failure, since
+ # we cannot succeed.
+ live = sf._test_start_shares[0]
+ die = sf._test_start_shares[1]
+ sf._block_request_activity(live, live._shnum, COMPLETE, "block")
+ sf._block_request_activity(die, die._shnum, DEAD)
+ return flushEventualQueue()
+ d.addCallback(_check3)
+ def _check4(ign):
+ self.failUnless(node.failed)
+ self.failUnless(node.failed.check(NotEnoughSharesError))
+ self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
+ str(node.failed))
+ d.addCallback(_check4)
+ return d
+
+ def test_avoid_redundancy(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we could satisfy the read entirely from the first server, but we'd
+ # prefer not to. Instead, we expect to only pull one share from the
+ # first server
+ shares = [MyShare(0, "peer-A", 0.0),
+ MyShare(1, "peer-B", 1.0),
+ MyShare(0, "peer-C", 2.0), # this will be skipped
+ MyShare(1, "peer-D", 3.0),
+ MyShare(2, "peer-E", 4.0),
+ ]
+ sf.add_shares(shares[:3])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1]])
+ # allow sh1 to retire
+ sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ # and then feed in the remaining shares
+ sf.add_shares(shares[3:])
+ sf.no_more_shares()
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1], shares[4]])
+ sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
+ sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
+ return flushEventualQueue()
+ d.addCallback(_check3)
+ def _check4(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check4)
+ return d