import time
now = time.time
from foolscap.api import eventually
-from allmydata.util import base32, log, idlib
+from allmydata.util import base32, log
from twisted.internet import reactor
from share import Share, CommonShare
return res
class RequestToken:
- def __init__(self, peerid):
- self.peerid = peerid
+ def __init__(self, server):
+ self.server = server
class ShareFinder:
OVERDUE_TIMEOUT = 10.0
# test_dirnode, which creates us with storage_broker=None
if not self._started:
si = self.verifycap.storage_index
- servers = [(s.get_serverid(), s.get_rref())
- for s in self._storage_broker.get_servers_for_psi(si)]
+ servers = self._storage_broker.get_servers_for_psi(si)
self._servers = iter(servers)
self._started = True
# internal methods
def loop(self):
- pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
+ pending_s = ",".join([rt.server.name()
for rt in self.pending_requests]) # sort?
self.log(format="ShareFinder loop: running=%(running)s"
" hungry=%(hungry)s, pending=%(pending)s",
eventually(self.share_consumer.no_more_shares)
def send_request(self, server):
- peerid, rref = server
- req = RequestToken(peerid)
+ req = RequestToken(server)
self.pending_requests.add(req)
- lp = self.log(format="sending DYHB to [%(peerid)s]",
- peerid=idlib.shortnodeid_b2a(peerid),
+ lp = self.log(format="sending DYHB to [%(name)s]", name=server.name(),
level=log.NOISY, umid="Io7pyg")
time_sent = now()
- d_ev = self._download_status.add_dyhb_sent(peerid, time_sent)
+ d_ev = self._download_status.add_dyhb_sent(server.get_serverid(),
+ time_sent)
# TODO: get the timer from a Server object, it knows best
self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
self.overdue, req)
- d = rref.callRemote("get_buckets", self._storage_index)
+ d = server.get_rref().callRemote("get_buckets", self._storage_index)
d.addBoth(incidentally, self._request_retired, req)
d.addCallbacks(self._got_response, self._got_error,
- callbackArgs=(rref.version, peerid, req, d_ev,
- time_sent, lp),
- errbackArgs=(peerid, req, d_ev, lp))
+ callbackArgs=(server, req, d_ev, time_sent, lp),
+ errbackArgs=(server, req, d_ev, lp))
d.addErrback(log.err, format="error in send_request",
level=log.WEIRD, parent=lp, umid="rpdV0w")
d.addCallback(incidentally, eventually, self.loop)
self.overdue_requests.add(req)
eventually(self.loop)
- def _got_response(self, buckets, server_version, peerid, req, d_ev,
- time_sent, lp):
+ def _got_response(self, buckets, server, req, d_ev, time_sent, lp):
shnums = sorted([shnum for shnum in buckets])
time_received = now()
d_ev.finished(shnums, time_received)
dyhb_rtt = time_received - time_sent
if not buckets:
- self.log(format="no shares from [%(peerid)s]",
- peerid=idlib.shortnodeid_b2a(peerid),
+ self.log(format="no shares from [%(name)s]", name=server.name(),
level=log.NOISY, parent=lp, umid="U7d4JA")
return
shnums_s = ",".join([str(shnum) for shnum in shnums])
- self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
- shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
+ self.log(format="got shnums [%(shnums)s] from [%(name)s]",
+ shnums=shnums_s, name=server.name(),
level=log.NOISY, parent=lp, umid="0fcEZw")
shares = []
for shnum, bucket in buckets.iteritems():
- s = self._create_share(shnum, bucket, server_version, peerid,
- dyhb_rtt)
+ s = self._create_share(shnum, bucket, server, dyhb_rtt)
shares.append(s)
self._deliver_shares(shares)
- def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt):
+ def _create_share(self, shnum, bucket, server, dyhb_rtt):
if shnum in self._commonshares:
cs = self._commonshares[shnum]
else:
# 2: break _get_satisfaction into Deferred-attached pieces.
# Yuck.
self._commonshares[shnum] = cs
- s = Share(bucket, server_version, self.verifycap, cs, self.node,
- self._download_status, peerid, shnum, dyhb_rtt,
+ s = Share(bucket, server.get_version(), self.verifycap, cs, self.node,
+ self._download_status, server.get_serverid(), shnum, dyhb_rtt,
self._node_logparent)
return s
level=log.NOISY, umid="2n1qQw")
eventually(self.share_consumer.got_shares, shares)
- def _got_error(self, f, peerid, req, d_ev, lp):
+ def _got_error(self, f, server, req, d_ev, lp):
d_ev.finished("error", now())
- self.log(format="got error from [%(peerid)s]",
- peerid=idlib.shortnodeid_b2a(peerid), failure=f,
+ self.log(format="got error from [%(name)s]",
+ name=server.name(), failure=f,
level=log.UNUSUAL, parent=lp, umid="zUKdCw")