4 from foolscap.api import eventually
5 from allmydata.util import base32, log, idlib
6 from twisted.internet import reactor
8 from share import Share, CommonShare
10 def incidentally(res, f, *args, **kwargs):
11 """Add me to a Deferred chain like this:
12 d.addBoth(incidentally, func, arg)
13 and I'll behave as if you'd added the following function:
17 This is useful if you want to execute an expression when the Deferred
18 fires, but don't care about its value.
24 def __init__(self, peerid):
28 OVERDUE_TIMEOUT = 10.0
30 def __init__(self, storage_broker, verifycap, node, download_status,
31 logparent=None, max_outstanding_requests=10):
32 self.running = True # stopped by Share.stop, from Terminator
33 self.verifycap = verifycap
35 self._storage_broker = storage_broker
36 self.share_consumer = self.node = node
37 self.max_outstanding_requests = max_outstanding_requests
40 self._commonshares = {} # shnum to CommonShare instance
41 self.pending_requests = set()
42 self.overdue_requests = set() # subset of pending_requests
43 self.overdue_timers = {}
45 self._storage_index = verifycap.storage_index
46 self._si_prefix = base32.b2a_l(self._storage_index[:8], 60)
47 self._node_logparent = logparent
48 self._download_status = download_status
49 self._lp = log.msg(format="ShareFinder[si=%(si)s] starting",
51 level=log.NOISY, parent=logparent, umid="2xjj2A")
53 def update_num_segments(self):
54 (numsegs, authoritative) = self.node.get_num_segments()
56 for cs in self._commonshares.values():
57 cs.set_authoritative_num_segments(numsegs)
59 def start_finding_servers(self):
60 # don't get servers until somebody uses us: creating the
61 # ImmutableFileNode should not cause work to happen yet. Test case is
62 # test_dirnode, which creates us with storage_broker=None
64 si = self.verifycap.storage_index
65 servers = [(s.get_serverid(), s.get_rref())
66 for s in self._storage_broker.get_servers_for_psi(si)]
67 self._servers = iter(servers)
70 def log(self, *args, **kwargs):
71 if "parent" not in kwargs:
72 kwargs["parent"] = self._lp
73 return log.msg(*args, **kwargs)
77 while self.overdue_timers:
78 req,t = self.overdue_timers.popitem()
81 # called by our parent CiphertextDownloader
83 self.log(format="ShareFinder[si=%(si)s] hungry",
84 si=self._si_prefix, level=log.NOISY, umid="NywYaQ")
85 self.start_finding_servers()
91 pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
92 for rt in self.pending_requests]) # sort?
93 self.log(format="ShareFinder loop: running=%(running)s"
94 " hungry=%(hungry)s, pending=%(pending)s",
95 running=self.running, hungry=self._hungry, pending=pending_s,
96 level=log.NOISY, umid="kRtS4Q")
102 non_overdue = self.pending_requests - self.overdue_requests
103 if len(non_overdue) >= self.max_outstanding_requests:
104 # cannot send more requests, must wait for some to retire
110 server = self._servers.next()
111 except StopIteration:
115 self.send_request(server)
116 # we loop again to get parallel queries. The check above will
117 # prevent us from looping forever.
118 eventually(self.loop)
121 if self.pending_requests:
122 # no server, but there are still requests in flight: maybe one of
123 # them will make progress
126 self.log(format="ShareFinder.loop: no_more_shares, ever",
127 level=log.UNUSUAL, umid="XjQlzg")
128 # we've run out of servers (so we can't send any more requests), and
129 # we have nothing in flight. No further progress can be made. They
130 # are destined to remain hungry.
131 eventually(self.share_consumer.no_more_shares)
133 def send_request(self, server):
134 peerid, rref = server
135 req = RequestToken(peerid)
136 self.pending_requests.add(req)
137 lp = self.log(format="sending DYHB to [%(peerid)s]",
138 peerid=idlib.shortnodeid_b2a(peerid),
139 level=log.NOISY, umid="Io7pyg")
141 d_ev = self._download_status.add_dyhb_sent(peerid, time_sent)
142 # TODO: get the timer from a Server object, it knows best
143 self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
145 d = rref.callRemote("get_buckets", self._storage_index)
146 d.addBoth(incidentally, self._request_retired, req)
147 d.addCallbacks(self._got_response, self._got_error,
148 callbackArgs=(rref.version, peerid, req, d_ev,
150 errbackArgs=(peerid, req, d_ev, lp))
151 d.addErrback(log.err, format="error in send_request",
152 level=log.WEIRD, parent=lp, umid="rpdV0w")
153 d.addCallback(incidentally, eventually, self.loop)
155 def _request_retired(self, req):
156 self.pending_requests.discard(req)
157 self.overdue_requests.discard(req)
158 if req in self.overdue_timers:
159 self.overdue_timers[req].cancel()
160 del self.overdue_timers[req]
162 def overdue(self, req):
163 del self.overdue_timers[req]
164 assert req in self.pending_requests # paranoia, should never be false
165 self.overdue_requests.add(req)
166 eventually(self.loop)
168 def _got_response(self, buckets, server_version, peerid, req, d_ev,
170 shnums = sorted([shnum for shnum in buckets])
171 time_received = now()
172 d_ev.finished(shnums, time_received)
173 dyhb_rtt = time_received - time_sent
175 self.log(format="no shares from [%(peerid)s]",
176 peerid=idlib.shortnodeid_b2a(peerid),
177 level=log.NOISY, parent=lp, umid="U7d4JA")
179 shnums_s = ",".join([str(shnum) for shnum in shnums])
180 self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
181 shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
182 level=log.NOISY, parent=lp, umid="0fcEZw")
184 for shnum, bucket in buckets.iteritems():
185 s = self._create_share(shnum, bucket, server_version, peerid,
188 self._deliver_shares(shares)
190 def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt):
191 if shnum in self._commonshares:
192 cs = self._commonshares[shnum]
194 numsegs, authoritative = self.node.get_num_segments()
195 cs = CommonShare(numsegs, self._si_prefix, shnum,
196 self._node_logparent)
198 cs.set_authoritative_num_segments(numsegs)
199 # Share._get_satisfaction is responsible for updating
200 # CommonShare.set_numsegs after we know the UEB. Alternatives:
201 # 1: d = self.node.get_num_segments()
202 # d.addCallback(cs.got_numsegs)
203 # the problem is that the OneShotObserverList I was using
204 # inserts an eventual-send between _get_satisfaction's
205 # _satisfy_UEB and _satisfy_block_hash_tree, and the
206 # CommonShare didn't get the num_segs message before
207 # being asked to set block hash values. To resolve this
208 # would require an immediate ObserverList instead of
209 # an eventual-send -based one
210 # 2: break _get_satisfaction into Deferred-attached pieces.
212 self._commonshares[shnum] = cs
213 s = Share(bucket, server_version, self.verifycap, cs, self.node,
214 self._download_status, peerid, shnum, dyhb_rtt,
215 self._node_logparent)
218 def _deliver_shares(self, shares):
219 # they will call hungry() again if they want more
221 shares_s = ",".join([str(sh) for sh in shares])
222 self.log(format="delivering shares: %s" % shares_s,
223 level=log.NOISY, umid="2n1qQw")
224 eventually(self.share_consumer.got_shares, shares)
226 def _got_error(self, f, peerid, req, d_ev, lp):
227 d_ev.finished("error", now())
228 self.log(format="got error from [%(peerid)s]",
229 peerid=idlib.shortnodeid_b2a(peerid), failure=f,
230 level=log.UNUSUAL, parent=lp, umid="zUKdCw")