from twisted.internet import defer, reactor
from allmydata import uri
from allmydata.storage.server import storage_index_to_dir
-from allmydata.util import base32, fileutil, spans, log
+from allmydata.util import base32, fileutil, spans, log, hashutil
from allmydata.util.consumer import download_to_data, MemoryConsumer
from allmydata.immutable import upload, layout
-from allmydata.test.no_network import GridTestMixin
+from allmydata.test.no_network import GridTestMixin, NoNetworkServer
from allmydata.test.common import ShouldFailMixin
-from allmydata.interfaces import NotEnoughSharesError, NoSharesError
+from allmydata.interfaces import NotEnoughSharesError, NoSharesError, \
+ DownloadStopped
from allmydata.immutable.downloader.common import BadSegmentNumberError, \
- BadCiphertextHashError, DownloadStopped
+ BadCiphertextHashError, COMPLETE, OVERDUE, DEAD
from allmydata.immutable.downloader.status import DownloadStatus
+from allmydata.immutable.downloader.fetcher import SegmentFetcher
from allmydata.codec import CRSDecoder
from foolscap.eventual import fireEventually, flushEventualQueue
def _got_data(data):
self.failUnlessEqual(data, plaintext)
d.addCallback(_got_data)
- def _kill_some_servers():
- # find the three shares that were used, and delete them. Then
- # download again, forcing the downloader to fail over to other
- # shares
- servers = []
- shares = sorted([s._shnum for s in self.n._cnode._node._shares])
- self.failUnlessEqual(shares, [0,1,2])
+ def _kill_some_shares():
+ # find the shares that were used and delete them
+ shares = self.n._cnode._node._shares
+ shnums = sorted([s._shnum for s in shares])
+ self.failUnlessEqual(shnums, [0,1,2,3])
+
# break the RIBucketReader references
- for s in self.n._cnode._node._shares:
+ # (we don't break the RIStorageServer references, because that
+ # isn't needed to test the current downloader implementation)
+ for s in shares:
s._rref.broken = True
- for servernum in immutable_shares:
- for shnum in immutable_shares[servernum]:
- if s._shnum == shnum:
- ss = self.g.servers_by_number[servernum]
- servers.append(ss)
- # and, for good measure, break the RIStorageServer references
- # too, just in case the downloader gets more aggressive in the
- # future and tries to re-fetch the same share.
- for ss in servers:
- wrapper = self.g.servers_by_id[ss.my_nodeid]
- wrapper.broken = True
def _download_again(ign):
- c = StallingConsumer(_kill_some_servers)
+ # download again, deleting some shares after the first write
+ # to the consumer
+ c = StallingConsumer(_kill_some_shares)
return self.n.read(c)
d.addCallback(_download_again)
def _check_failover(c):
self.failUnlessEqual("".join(c.chunks), plaintext)
- shares = sorted([s._shnum for s in self.n._cnode._node._shares])
- # we should now be using more shares than we were before
- self.failIfEqual(shares, [0,1,2])
+ shares = self.n._cnode._node._shares
+ shnums = sorted([s._shnum for s in shares])
+ self.failIfEqual(shnums, [0,1,2,3])
d.addCallback(_check_failover)
return d
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
- self.failUnless(f.check(NotEnoughSharesError))
+ self.failUnless(f.check(NoSharesError))
con2.producer.stopProducing()
return d2
d.addCallbacks(_con1_should_not_succeed, _con1_failed)
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
- self.failUnless(f.check(NotEnoughSharesError))
+ self.failUnless(f.check(NoSharesError))
# we *don't* cancel the second one here: this exercises a
# lost-progress bug from #1154. We just wait for it to
# succeed.
# tweak the client's copies of server-version data, so it believes
# that they're old and can't handle reads that overrun the length of
# the share. This exercises a different code path.
- for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+ for s in self.c0.storage_broker.get_connected_servers():
+ rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False
# All these tests result in a failed download.
d.addCallback(self._corrupt_flip_all, imm_uri, i)
d.addCallback(lambda ign:
- self.shouldFail(NotEnoughSharesError, which,
+ self.shouldFail(NoSharesError, which,
substring,
_download, imm_uri))
d.addCallback(lambda ign: self.restore_all_shares(self.shares))
# tweak the client's copies of server-version data, so it believes
# that they're old and can't handle reads that overrun the length of
# the share. This exercises a different code path.
- for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+ for s in self.c0.storage_broker.get_connected_servers():
+ rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False
self.set_up_grid()
self.c0 = self.g.clients[0]
- for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+ for s in self.c0.storage_broker.get_connected_servers():
+ rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False
now = 12345.1
ds = DownloadStatus("si-1", 123)
self.failUnlessEqual(ds.get_status(), "idle")
- ds.add_segment_request(0, now)
+ ev0 = ds.add_segment_request(0, now)
self.failUnlessEqual(ds.get_status(), "fetching segment 0")
- ds.add_segment_delivery(0, now+1, 0, 1000, 2.0)
+ ev0.activate(now+0.5)
+ ev0.deliver(now+1, 0, 1000, 2.0)
self.failUnlessEqual(ds.get_status(), "idle")
- ds.add_segment_request(2, now+2)
- ds.add_segment_request(1, now+2)
+ ev2 = ds.add_segment_request(2, now+2)
+ del ev2 # hush pyflakes
+ ev1 = ds.add_segment_request(1, now+2)
self.failUnlessEqual(ds.get_status(), "fetching segments 1,2")
- ds.add_segment_error(1, now+3)
+ ev1.error(now+3)
self.failUnlessEqual(ds.get_status(),
"fetching segment 2; errors on segment 1")
e2.update(1000, 2.0, 2.0)
e2.finished(now+5)
self.failUnlessEqual(ds.get_progress(), 1.0)
+
+ def test_active(self):
+ now = 12345.1
+ ds = DownloadStatus("si-1", 123)
+ self.failUnlessEqual(ds.get_active(), False)
+ e1 = ds.add_read_event(0, 1000, now)
+ self.failUnlessEqual(ds.get_active(), True)
+ e2 = ds.add_read_event(1, 1000, now+1)
+ self.failUnlessEqual(ds.get_active(), True)
+ e1.finished(now+2)
+ self.failUnlessEqual(ds.get_active(), True)
+ e2.finished(now+3)
+ self.failUnlessEqual(ds.get_active(), False)
+
+def make_server(clientid):
+ tubid = hashutil.tagged_hash("clientid", clientid)[:20]
+ return NoNetworkServer(tubid, None)
+def make_servers(clientids):
+ servers = {}
+ for clientid in clientids:
+ servers[clientid] = make_server(clientid)
+ return servers
+
+class MyShare:
+ def __init__(self, shnum, server, rtt):
+ self._shnum = shnum
+ self._server = server
+ self._dyhb_rtt = rtt
+ def __repr__(self):
+ return "sh%d-on-%s" % (self._shnum, self._server.get_name())
+
+class MySegmentFetcher(SegmentFetcher):
+ def __init__(self, *args, **kwargs):
+ SegmentFetcher.__init__(self, *args, **kwargs)
+ self._test_start_shares = []
+ def _start_share(self, share, shnum):
+ self._test_start_shares.append(share)
+
+class FakeNode:
+ def __init__(self):
+ self.want_more = 0
+ self.failed = None
+ self.processed = None
+ self._si_prefix = "si_prefix"
+ def want_more_shares(self):
+ self.want_more += 1
+ def fetch_failed(self, fetcher, f):
+ self.failed = f
+ def process_blocks(self, segnum, blocks):
+ self.processed = (segnum, blocks)
+ def get_num_segments(self):
+ return 1, True
+
+class Selection(unittest.TestCase):
+ def test_no_shares(self):
+ node = FakeNode()
+ sf = SegmentFetcher(node, 0, 3, None)
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ self.failUnlessEqual(node.failed, None)
+ sf.no_more_shares()
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnless(node.failed)
+ self.failUnless(node.failed.check(NoSharesError))
+ d.addCallback(_check2)
+ return d
+
+ def test_only_one_share(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ serverA = make_server("peer-A")
+ shares = [MyShare(0, serverA, 0.0)]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ self.failUnlessEqual(node.failed, None)
+ sf.no_more_shares()
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnless(node.failed)
+ self.failUnless(node.failed.check(NotEnoughSharesError))
+ sname = serverA.get_name()
+ self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused=" % sname,
+ str(node.failed))
+ d.addCallback(_check2)
+ return d
+
+ def test_good_diversity_early(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 0)
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check2)
+ return d
+
+ def test_good_diversity_late(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ sf.add_shares(shares)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_avoid_bad_diversity_late(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we could satisfy the read entirely from the first server, but we'd
+ # prefer not to. Instead, we expect to only pull one share from the
+ # first server
+ servers = make_servers(["peer-A", "peer-B", "peer-C"])
+ shares = [MyShare(0, servers["peer-A"], 0.0),
+ MyShare(1, servers["peer-A"], 0.0),
+ MyShare(2, servers["peer-A"], 0.0),
+ MyShare(3, servers["peer-B"], 1.0),
+ MyShare(4, servers["peer-C"], 2.0),
+ ]
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ sf.add_shares(shares)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[3], shares[4]])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 3: "block-3",
+ 4: "block-4"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_suffer_bad_diversity_late(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we satisfy the read entirely from the first server because we don't
+ # have any other choice.
+ serverA = make_server("peer-A")
+ shares = [MyShare(0, serverA, 0.0),
+ MyShare(1, serverA, 0.0),
+ MyShare(2, serverA, 0.0),
+ MyShare(3, serverA, 0.0),
+ MyShare(4, serverA, 0.0),
+ ]
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ sf.add_shares(shares)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(node.want_more, 3)
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1], shares[2]])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_suffer_bad_diversity_early(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we satisfy the read entirely from the first server because we don't
+ # have any other choice.
+ serverA = make_server("peer-A")
+ shares = [MyShare(0, serverA, 0.0),
+ MyShare(1, serverA, 0.0),
+ MyShare(2, serverA, 0.0),
+ MyShare(3, serverA, 0.0),
+ MyShare(4, serverA, 0.0),
+ ]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 2)
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1], shares[2]])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check2)
+ return d
+
+ def test_overdue(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 0)
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, OVERDUE)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares, shares[:6])
+ for sh in sf._test_start_shares[3:]:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {3: "block-3",
+ 4: "block-4",
+ 5: "block-5"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_overdue_fails(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ servers = make_servers(["peer-%d" % i for i in range(6)])
+ shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)]
+ sf.add_shares(shares)
+ sf.no_more_shares()
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 0)
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, OVERDUE)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares, shares[:6])
+ for sh in sf._test_start_shares[3:]:
+ sf._block_request_activity(sh, sh._shnum, DEAD)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ # we're still waiting
+ self.failUnlessEqual(node.processed, None)
+ self.failUnlessEqual(node.failed, None)
+ # now complete one of the overdue ones, and kill one of the other
+ # ones, leaving one hanging. This should trigger a failure, since
+ # we cannot succeed.
+ live = sf._test_start_shares[0]
+ die = sf._test_start_shares[1]
+ sf._block_request_activity(live, live._shnum, COMPLETE, "block")
+ sf._block_request_activity(die, die._shnum, DEAD)
+ return flushEventualQueue()
+ d.addCallback(_check3)
+ def _check4(ign):
+ self.failUnless(node.failed)
+ self.failUnless(node.failed.check(NotEnoughSharesError))
+ sname = servers["peer-2"].get_name()
+ self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname,
+ str(node.failed))
+ d.addCallback(_check4)
+ return d
+
+ def test_avoid_redundancy(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we could satisfy the read entirely from the first server, but we'd
+ # prefer not to. Instead, we expect to only pull one share from the
+ # first server
+ servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D",
+ "peer-E"])
+ shares = [MyShare(0, servers["peer-A"],0.0),
+ MyShare(1, servers["peer-B"],1.0),
+ MyShare(0, servers["peer-C"],2.0), # this will be skipped
+ MyShare(1, servers["peer-D"],3.0),
+ MyShare(2, servers["peer-E"],4.0),
+ ]
+ sf.add_shares(shares[:3])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1]])
+ # allow sh1 to retire
+ sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ # and then feed in the remaining shares
+ sf.add_shares(shares[3:])
+ sf.no_more_shares()
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1], shares[4]])
+ sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
+ sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
+ return flushEventualQueue()
+ d.addCallback(_check3)
+ def _check4(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check4)
+ return d