# this is a specific implementation of IShare for tahoe's native storage
# servers. A different backend would use a different class.
- def __init__(self, rref, server_version, verifycap, commonshare, node,
- download_status, peerid, shnum, dyhb_rtt, logparent):
+ def __init__(self, rref, server, verifycap, commonshare, node,
+ download_status, shnum, dyhb_rtt, logparent):
self._rref = rref
- self._server_version = server_version
+ self._server = server
self._node = node # holds share_hash_tree and UEB
self.actual_segment_size = node.segment_size # might still be None
# XXX change node.guessed_segment_size to
self._UEB_length = None
self._commonshare = commonshare # holds block_hash_tree
self._download_status = download_status
- self._peerid = peerid
- self._peerid_s = base32.b2a(peerid)[:5]
self._storage_index = verifycap.storage_index
self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
self._shnum = shnum
# download can re-fetch it.
self._requested_blocks = [] # (segnum, set(observer2..))
- ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"]
+ v = server.get_version()
+ ver = v["http://allmydata.org/tahoe/protocols/storage/v1"]
self._overrun_ok = ver["tolerates-immutable-read-overrun"]
# If _overrun_ok and we guess the offsets correctly, we can get
# everything in one RTT. If _overrun_ok and we guess wrong, we might
self.had_corruption = False # for unit tests
def __repr__(self):
- return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s)
+ return "Share(sh%d-on-%s)" % (self._shnum, self._server.name())
def is_alive(self):
# XXX: reconsider. If the share sees a single error, should it remain
share=repr(self),
start=start, length=length,
level=log.NOISY, parent=self._lp, umid="sgVAyA")
- req_ev = ds.add_request_sent(self._peerid, self._shnum,
+ req_ev = ds.add_request_sent(self._server.get_serverid(),
+ self._shnum,
start, length, now())
d = self._send_request(start, length)
d.addCallback(self._got_data, start, length, req_ev, lp)
log.msg(format="error requesting %(start)d+%(length)d"
" from %(server)s for si %(si)s",
start=start, length=length,
- server=self._peerid_s, si=self._si_prefix,
+ server=self._server.name(), si=self._si_prefix,
failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw")
# retire our observers, assuming we won't be able to make any
# further progress
from twisted.internet import defer, reactor
from allmydata import uri
from allmydata.storage.server import storage_index_to_dir
-from allmydata.util import base32, fileutil, spans, log
+from allmydata.util import base32, fileutil, spans, log, hashutil
from allmydata.util.consumer import download_to_data, MemoryConsumer
from allmydata.immutable import upload, layout
-from allmydata.test.no_network import GridTestMixin
+from allmydata.test.no_network import GridTestMixin, NoNetworkServer
from allmydata.test.common import ShouldFailMixin
from allmydata.interfaces import NotEnoughSharesError, NoSharesError
from allmydata.immutable.downloader.common import BadSegmentNumberError, \
e2.finished(now+3)
self.failUnlessEqual(ds.get_active(), False)
+def make_server(clientid):
+ tubid = hashutil.tagged_hash("clientid", clientid)[:20]
+ return NoNetworkServer(tubid, None)
+def make_servers(clientids):
+ servers = {}
+ for clientid in clientids:
+ servers[clientid] = make_server(clientid)
+ return servers
+
class MyShare:
- def __init__(self, shnum, peerid, rtt):
+ def __init__(self, shnum, server, rtt):
self._shnum = shnum
- self._peerid = peerid
- self._peerid_s = peerid
+ self._server = server
self._dyhb_rtt = rtt
def __repr__(self):
- return "sh%d-on-%s" % (self._shnum, self._peerid)
+ return "sh%d-on-%s" % (self._shnum, self._server.name())
class MySegmentFetcher(SegmentFetcher):
def __init__(self, *args, **kwargs):
def test_only_one_share(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(0, "peer-A", 0.0)]
+ serverA = make_server("peer-A")
+ shares = [MyShare(0, serverA, 0.0)]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
def _check2(ign):
self.failUnless(node.failed)
self.failUnless(node.failed.check(NotEnoughSharesError))
- self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
+ sname = serverA.name()
+ self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused=" % sname,
str(node.failed))
d.addCallback(_check2)
return d
def test_good_diversity_early(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
def test_good_diversity_late(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
sf.add_shares([])
d = flushEventualQueue()
def _check1(ign):
# we could satisfy the read entirely from the first server, but we'd
# prefer not to. Instead, we expect to only pull one share from the
# first server
- shares = [MyShare(0, "peer-A", 0.0),
- MyShare(1, "peer-A", 0.0),
- MyShare(2, "peer-A", 0.0),
- MyShare(3, "peer-B", 1.0),
- MyShare(4, "peer-C", 2.0),
+ servers = make_servers(["peer-A", "peer-B", "peer-C"])
+ shares = [MyShare(0, servers["peer-A"], 0.0),
+ MyShare(1, servers["peer-A"], 0.0),
+ MyShare(2, servers["peer-A"], 0.0),
+ MyShare(3, servers["peer-B"], 1.0),
+ MyShare(4, servers["peer-C"], 2.0),
]
sf.add_shares([])
d = flushEventualQueue()
sf = MySegmentFetcher(node, 0, 3, None)
# we satisfy the read entirely from the first server because we don't
# have any other choice.
- shares = [MyShare(0, "peer-A", 0.0),
- MyShare(1, "peer-A", 0.0),
- MyShare(2, "peer-A", 0.0),
- MyShare(3, "peer-A", 0.0),
- MyShare(4, "peer-A", 0.0),
+ serverA = make_server("peer-A")
+ shares = [MyShare(0, serverA, 0.0),
+ MyShare(1, serverA, 0.0),
+ MyShare(2, serverA, 0.0),
+ MyShare(3, serverA, 0.0),
+ MyShare(4, serverA, 0.0),
]
sf.add_shares([])
d = flushEventualQueue()
sf = MySegmentFetcher(node, 0, 3, None)
# we satisfy the read entirely from the first server because we don't
# have any other choice.
- shares = [MyShare(0, "peer-A", 0.0),
- MyShare(1, "peer-A", 0.0),
- MyShare(2, "peer-A", 0.0),
- MyShare(3, "peer-A", 0.0),
- MyShare(4, "peer-A", 0.0),
+ serverA = make_server("peer-A")
+ shares = [MyShare(0, serverA, 0.0),
+ MyShare(1, serverA, 0.0),
+ MyShare(2, serverA, 0.0),
+ MyShare(3, serverA, 0.0),
+ MyShare(4, serverA, 0.0),
]
sf.add_shares(shares)
d = flushEventualQueue()
def test_overdue(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
def test_overdue_fails(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
- shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
+ servers = make_servers(["peer-%d" % i for i in range(6)])
+ shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)]
sf.add_shares(shares)
sf.no_more_shares()
d = flushEventualQueue()
def _check4(ign):
self.failUnless(node.failed)
self.failUnless(node.failed.check(NotEnoughSharesError))
- self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
+ sname = servers["peer-2"].name()
+ self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname,
str(node.failed))
d.addCallback(_check4)
return d
# we could satisfy the read entirely from the first server, but we'd
# prefer not to. Instead, we expect to only pull one share from the
# first server
- shares = [MyShare(0, "peer-A", 0.0),
- MyShare(1, "peer-B", 1.0),
- MyShare(0, "peer-C", 2.0), # this will be skipped
- MyShare(1, "peer-D", 3.0),
- MyShare(2, "peer-E", 4.0),
+ servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D",
+ "peer-E"])
+ shares = [MyShare(0, servers["peer-A"],0.0),
+ MyShare(1, servers["peer-B"],1.0),
+ MyShare(0, servers["peer-C"],2.0), # this will be skipped
+ MyShare(1, servers["peer-D"],3.0),
+ MyShare(2, servers["peer-E"],4.0),
]
sf.add_shares(shares[:3])
d = flushEventualQueue()