]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/finder.py
f1142e79d0f01c1f15798bdb7b1bb31351ba87b7
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / finder.py
1
2 import time
3 now = time.time
4 from foolscap.api import eventually
5 from allmydata.util import base32, log, idlib
6 from twisted.internet import reactor
7
8 from share import Share, CommonShare
9
10 def incidentally(res, f, *args, **kwargs):
11     """Add me to a Deferred chain like this:
12      d.addBoth(incidentally, func, arg)
13     and I'll behave as if you'd added the following function:
14      def _(res):
15          func(arg)
16          return res
17     This is useful if you want to execute an expression when the Deferred
18     fires, but don't care about its value.
19     """
20     f(*args, **kwargs)
21     return res
22
23 class RequestToken:
24     def __init__(self, peerid):
25         self.peerid = peerid
26
27 class ShareFinder:
28     OVERDUE_TIMEOUT = 10.0
29
30     def __init__(self, storage_broker, verifycap, node, download_status,
31                  logparent=None, max_outstanding_requests=10):
32         self.running = True # stopped by Share.stop, from Terminator
33         self.verifycap = verifycap
34         self._started = False
35         self._storage_broker = storage_broker
36         self.share_consumer = self.node = node
37         self.max_outstanding_requests = max_outstanding_requests
38         self._hungry = False
39
40         self._commonshares = {} # shnum to CommonShare instance
41         self.pending_requests = set()
42         self.overdue_requests = set() # subset of pending_requests
43         self.overdue_timers = {}
44
45         self._storage_index = verifycap.storage_index
46         self._si_prefix = base32.b2a_l(self._storage_index[:8], 60)
47         self._node_logparent = logparent
48         self._download_status = download_status
49         self._lp = log.msg(format="ShareFinder[si=%(si)s] starting",
50                            si=self._si_prefix,
51                            level=log.NOISY, parent=logparent, umid="2xjj2A")
52
53     def update_num_segments(self):
54         (numsegs, authoritative) = self.node.get_num_segments()
55         assert authoritative
56         for cs in self._commonshares.values():
57             cs.set_authoritative_num_segments(numsegs)
58
59     def start_finding_servers(self):
60         # don't get servers until somebody uses us: creating the
61         # ImmutableFileNode should not cause work to happen yet. Test case is
62         # test_dirnode, which creates us with storage_broker=None
63         if not self._started:
64             si = self.verifycap.storage_index
65             servers = [(s.get_serverid(), s.get_rref())
66                        for s in self._storage_broker.get_servers_for_psi(si)]
67             self._servers = iter(servers)
68             self._started = True
69
70     def log(self, *args, **kwargs):
71         if "parent" not in kwargs:
72             kwargs["parent"] = self._lp
73         return log.msg(*args, **kwargs)
74
75     def stop(self):
76         self.running = False
77         while self.overdue_timers:
78             req,t = self.overdue_timers.popitem()
79             t.cancel()
80
81     # called by our parent CiphertextDownloader
82     def hungry(self):
83         self.log(format="ShareFinder[si=%(si)s] hungry",
84                  si=self._si_prefix, level=log.NOISY, umid="NywYaQ")
85         self.start_finding_servers()
86         self._hungry = True
87         eventually(self.loop)
88
89     # internal methods
90     def loop(self):
91         pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
92                               for rt in self.pending_requests]) # sort?
93         self.log(format="ShareFinder loop: running=%(running)s"
94                  " hungry=%(hungry)s, pending=%(pending)s",
95                  running=self.running, hungry=self._hungry, pending=pending_s,
96                  level=log.NOISY, umid="kRtS4Q")
97         if not self.running:
98             return
99         if not self._hungry:
100             return
101
102         non_overdue = self.pending_requests - self.overdue_requests
103         if len(non_overdue) >= self.max_outstanding_requests:
104             # cannot send more requests, must wait for some to retire
105             return
106
107         server = None
108         try:
109             if self._servers:
110                 server = self._servers.next()
111         except StopIteration:
112             self._servers = None
113
114         if server:
115             self.send_request(server)
116             # we loop again to get parallel queries. The check above will
117             # prevent us from looping forever.
118             eventually(self.loop)
119             return
120
121         if self.pending_requests:
122             # no server, but there are still requests in flight: maybe one of
123             # them will make progress
124             return
125
126         self.log(format="ShareFinder.loop: no_more_shares, ever",
127                  level=log.UNUSUAL, umid="XjQlzg")
128         # we've run out of servers (so we can't send any more requests), and
129         # we have nothing in flight. No further progress can be made. They
130         # are destined to remain hungry.
131         eventually(self.share_consumer.no_more_shares)
132
133     def send_request(self, server):
134         peerid, rref = server
135         req = RequestToken(peerid)
136         self.pending_requests.add(req)
137         lp = self.log(format="sending DYHB to [%(peerid)s]",
138                       peerid=idlib.shortnodeid_b2a(peerid),
139                       level=log.NOISY, umid="Io7pyg")
140         time_sent = now()
141         d_ev = self._download_status.add_dyhb_sent(peerid, time_sent)
142         # TODO: get the timer from a Server object, it knows best
143         self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
144                                                      self.overdue, req)
145         d = rref.callRemote("get_buckets", self._storage_index)
146         d.addBoth(incidentally, self._request_retired, req)
147         d.addCallbacks(self._got_response, self._got_error,
148                        callbackArgs=(rref.version, peerid, req, d_ev,
149                                      time_sent, lp),
150                        errbackArgs=(peerid, req, d_ev, lp))
151         d.addErrback(log.err, format="error in send_request",
152                      level=log.WEIRD, parent=lp, umid="rpdV0w")
153         d.addCallback(incidentally, eventually, self.loop)
154
155     def _request_retired(self, req):
156         self.pending_requests.discard(req)
157         self.overdue_requests.discard(req)
158         if req in self.overdue_timers:
159             self.overdue_timers[req].cancel()
160             del self.overdue_timers[req]
161
162     def overdue(self, req):
163         del self.overdue_timers[req]
164         assert req in self.pending_requests # paranoia, should never be false
165         self.overdue_requests.add(req)
166         eventually(self.loop)
167
168     def _got_response(self, buckets, server_version, peerid, req, d_ev,
169                       time_sent, lp):
170         shnums = sorted([shnum for shnum in buckets])
171         time_received = now()
172         d_ev.finished(shnums, time_received)
173         dyhb_rtt = time_received - time_sent
174         if not buckets:
175             self.log(format="no shares from [%(peerid)s]",
176                      peerid=idlib.shortnodeid_b2a(peerid),
177                      level=log.NOISY, parent=lp, umid="U7d4JA")
178             return
179         shnums_s = ",".join([str(shnum) for shnum in shnums])
180         self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
181                  shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
182                  level=log.NOISY, parent=lp, umid="0fcEZw")
183         shares = []
184         for shnum, bucket in buckets.iteritems():
185             s = self._create_share(shnum, bucket, server_version, peerid,
186                                    dyhb_rtt)
187             shares.append(s)
188         self._deliver_shares(shares)
189
190     def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt):
191         if shnum in self._commonshares:
192             cs = self._commonshares[shnum]
193         else:
194             numsegs, authoritative = self.node.get_num_segments()
195             cs = CommonShare(numsegs, self._si_prefix, shnum,
196                              self._node_logparent)
197             if authoritative:
198                 cs.set_authoritative_num_segments(numsegs)
199             # Share._get_satisfaction is responsible for updating
200             # CommonShare.set_numsegs after we know the UEB. Alternatives:
201             #  1: d = self.node.get_num_segments()
202             #     d.addCallback(cs.got_numsegs)
203             #   the problem is that the OneShotObserverList I was using
204             #   inserts an eventual-send between _get_satisfaction's
205             #   _satisfy_UEB and _satisfy_block_hash_tree, and the
206             #   CommonShare didn't get the num_segs message before
207             #   being asked to set block hash values. To resolve this
208             #   would require an immediate ObserverList instead of
209             #   an eventual-send -based one
210             #  2: break _get_satisfaction into Deferred-attached pieces.
211             #     Yuck.
212             self._commonshares[shnum] = cs
213         s = Share(bucket, server_version, self.verifycap, cs, self.node,
214                   self._download_status, peerid, shnum, dyhb_rtt,
215                   self._node_logparent)
216         return s
217
218     def _deliver_shares(self, shares):
219         # they will call hungry() again if they want more
220         self._hungry = False
221         shares_s = ",".join([str(sh) for sh in shares])
222         self.log(format="delivering shares: %s" % shares_s,
223                  level=log.NOISY, umid="2n1qQw")
224         eventually(self.share_consumer.got_shares, shares)
225
226     def _got_error(self, f, peerid, req, d_ev, lp):
227         d_ev.finished("error", now())
228         self.log(format="got error from [%(peerid)s]",
229                  peerid=idlib.shortnodeid_b2a(peerid), failure=f,
230                  level=log.UNUSUAL, parent=lp, umid="zUKdCw")
231
232