from twisted.internet import defer, reactor
from allmydata import uri
from allmydata.storage.server import storage_index_to_dir
-from allmydata.util import base32, fileutil, spans, log
+from allmydata.util import base32, fileutil, spans, log, hashutil
from allmydata.util.consumer import download_to_data, MemoryConsumer
from allmydata.immutable import upload, layout
-from allmydata.test.no_network import GridTestMixin
+from allmydata.test.no_network import GridTestMixin, NoNetworkServer
from allmydata.test.common import ShouldFailMixin
-from allmydata.interfaces import NotEnoughSharesError, NoSharesError
+from allmydata.interfaces import NotEnoughSharesError, NoSharesError, \
+ DownloadStopped
from allmydata.immutable.downloader.common import BadSegmentNumberError, \
- BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD
+ BadCiphertextHashError, COMPLETE, OVERDUE, DEAD
from allmydata.immutable.downloader.status import DownloadStatus
from allmydata.immutable.downloader.fetcher import SegmentFetcher
from allmydata.codec import CRSDecoder
def _got_data(data):
self.failUnlessEqual(data, plaintext)
d.addCallback(_got_data)
- def _kill_some_servers():
- # find the three shares that were used, and delete them. Then
- # download again, forcing the downloader to fail over to other
- # shares
- servers = []
- shares = sorted([s._shnum for s in self.n._cnode._node._shares])
- self.failUnlessEqual(shares, [0,1,2,3])
+ def _kill_some_shares():
+ # find the shares that were used and delete them
+ shares = self.n._cnode._node._shares
+ shnums = sorted([s._shnum for s in shares])
+ self.failUnlessEqual(shnums, [0,1,2,3])
+
# break the RIBucketReader references
- for s in self.n._cnode._node._shares:
+ # (we don't break the RIStorageServer references, because that
+ # isn't needed to test the current downloader implementation)
+ for s in shares:
s._rref.broken = True
- for servernum in immutable_shares:
- for shnum in immutable_shares[servernum]:
- if s._shnum == shnum:
- ss = self.g.servers_by_number[servernum]
- servers.append(ss)
- # and, for good measure, break the RIStorageServer references
- # too, just in case the downloader gets more aggressive in the
- # future and tries to re-fetch the same share.
- for ss in servers:
- wrapper = self.g.servers_by_id[ss.my_nodeid]
- wrapper.broken = True
def _download_again(ign):
- c = StallingConsumer(_kill_some_servers)
+ # download again, deleting some shares after the first write
+ # to the consumer
+ c = StallingConsumer(_kill_some_shares)
return self.n.read(c)
d.addCallback(_download_again)
def _check_failover(c):
self.failUnlessEqual("".join(c.chunks), plaintext)
- shares = sorted([s._shnum for s in self.n._cnode._node._shares])
- # we should now be using more shares than we were before
- self.failIfEqual(shares, [0,1,2,3])
+ shares = self.n._cnode._node._shares
+ shnums = sorted([s._shnum for s in shares])
+ self.failIfEqual(shnums, [0,1,2,3])
d.addCallback(_check_failover)
return d
# tweak the client's copies of server-version data, so it believes
# that they're old and can't handle reads that overrun the length of
# the share. This exercises a different code path.
- for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+ for s in self.c0.storage_broker.get_connected_servers():
+ rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False
# tweak the client's copies of server-version data, so it believes
# that they're old and can't handle reads that overrun the length of
# the share. This exercises a different code path.
- for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+ for s in self.c0.storage_broker.get_connected_servers():
+ rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False
self.set_up_grid()
self.c0 = self.g.clients[0]
- for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+ for s in self.c0.storage_broker.get_connected_servers():
+ rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False
now = 12345.1
ds = DownloadStatus("si-1", 123)
self.failUnlessEqual(ds.get_status(), "idle")
- ds.add_segment_request(0, now)
+ ev0 = ds.add_segment_request(0, now)
self.failUnlessEqual(ds.get_status(), "fetching segment 0")
- ds.add_segment_delivery(0, now+1, 0, 1000, 2.0)
+ ev0.activate(now+0.5)
+ ev0.deliver(now+1, 0, 1000, 2.0)
self.failUnlessEqual(ds.get_status(), "idle")
- ds.add_segment_request(2, now+2)
- ds.add_segment_request(1, now+2)
+ ev2 = ds.add_segment_request(2, now+2)
+ del ev2 # hush pyflakes
+ ev1 = ds.add_segment_request(1, now+2)
self.failUnlessEqual(ds.get_status(), "fetching segments 1,2")
- ds.add_segment_error(1, now+3)
+ ev1.error(now+3)
self.failUnlessEqual(ds.get_status(),
"fetching segment 2; errors on segment 1")
e2.finished(now+3)
self.failUnlessEqual(ds.get_active(), False)
+def make_server(clientid):
+ tubid = hashutil.tagged_hash("clientid", clientid)[:20]
+ return NoNetworkServer(tubid, None)
+def make_servers(clientids):
+ servers = {}
+ for clientid in clientids:
+ servers[clientid] = make_server(clientid)
+ return servers
+
class MyShare:
- def __init__(self, shnum, peerid, rtt):
+ def __init__(self, shnum, server, rtt):
self._shnum = shnum
- self._peerid = peerid
- self._peerid_s = peerid
+ self._server = server
self._dyhb_rtt = rtt
def __repr__(self):
- return "sh%d-on-%s" % (self._shnum, self._peerid)
+ return "sh%d-on-%s" % (self._shnum, self._server.get_name())
class MySegmentFetcher(SegmentFetcher):
def __init__(self, *args, **kwargs):
def test_only_one_share(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(0, "peer-A", 0.0)]
+ serverA = make_server("peer-A")
+ shares = [MyShare(0, serverA, 0.0)]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
def _check2(ign):
self.failUnless(node.failed)
self.failUnless(node.failed.check(NotEnoughSharesError))
- self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
+ sname = serverA.get_name()
+ self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused=" % sname,
str(node.failed))
d.addCallback(_check2)
return d
def test_good_diversity_early(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
def test_good_diversity_late(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
sf.add_shares([])
d = flushEventualQueue()
def _check1(ign):
# we could satisfy the read entirely from the first server, but we'd
# prefer not to. Instead, we expect to only pull one share from the
# first server
- shares = [MyShare(0, "peer-A", 0.0),
- MyShare(1, "peer-A", 0.0),
- MyShare(2, "peer-A", 0.0),
- MyShare(3, "peer-B", 1.0),
- MyShare(4, "peer-C", 2.0),
+ servers = make_servers(["peer-A", "peer-B", "peer-C"])
+ shares = [MyShare(0, servers["peer-A"], 0.0),
+ MyShare(1, servers["peer-A"], 0.0),
+ MyShare(2, servers["peer-A"], 0.0),
+ MyShare(3, servers["peer-B"], 1.0),
+ MyShare(4, servers["peer-C"], 2.0),
]
sf.add_shares([])
d = flushEventualQueue()
sf = MySegmentFetcher(node, 0, 3, None)
# we satisfy the read entirely from the first server because we don't
# have any other choice.
- shares = [MyShare(0, "peer-A", 0.0),
- MyShare(1, "peer-A", 0.0),
- MyShare(2, "peer-A", 0.0),
- MyShare(3, "peer-A", 0.0),
- MyShare(4, "peer-A", 0.0),
+ serverA = make_server("peer-A")
+ shares = [MyShare(0, serverA, 0.0),
+ MyShare(1, serverA, 0.0),
+ MyShare(2, serverA, 0.0),
+ MyShare(3, serverA, 0.0),
+ MyShare(4, serverA, 0.0),
]
sf.add_shares([])
d = flushEventualQueue()
sf = MySegmentFetcher(node, 0, 3, None)
# we satisfy the read entirely from the first server because we don't
# have any other choice.
- shares = [MyShare(0, "peer-A", 0.0),
- MyShare(1, "peer-A", 0.0),
- MyShare(2, "peer-A", 0.0),
- MyShare(3, "peer-A", 0.0),
- MyShare(4, "peer-A", 0.0),
+ serverA = make_server("peer-A")
+ shares = [MyShare(0, serverA, 0.0),
+ MyShare(1, serverA, 0.0),
+ MyShare(2, serverA, 0.0),
+ MyShare(3, serverA, 0.0),
+ MyShare(4, serverA, 0.0),
]
sf.add_shares(shares)
d = flushEventualQueue()
def test_overdue(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
def test_overdue_fails(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
+ servers = make_servers(["peer-%d" % i for i in range(6)])
+ shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)]
sf.add_shares(shares)
sf.no_more_shares()
d = flushEventualQueue()
def _check4(ign):
self.failUnless(node.failed)
self.failUnless(node.failed.check(NotEnoughSharesError))
- self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
+ sname = servers["peer-2"].get_name()
+ self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname,
str(node.failed))
d.addCallback(_check4)
return d
# we could satisfy the read entirely from the first server, but we'd
# prefer not to. Instead, we expect to only pull one share from the
# first server
- shares = [MyShare(0, "peer-A", 0.0),
- MyShare(1, "peer-B", 1.0),
- MyShare(0, "peer-C", 2.0), # this will be skipped
- MyShare(1, "peer-D", 3.0),
- MyShare(2, "peer-E", 4.0),
+ servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D",
+ "peer-E"])
+ shares = [MyShare(0, servers["peer-A"],0.0),
+ MyShare(1, servers["peer-B"],1.0),
+ MyShare(0, servers["peer-C"],2.0), # this will be skipped
+ MyShare(1, servers["peer-D"],3.0),
+ MyShare(2, servers["peer-E"],4.0),
]
sf.add_shares(shares[:3])
d = flushEventualQueue()