2 from twisted.python.failure import Failure
3 from foolscap.api import eventually
4 from allmydata.interfaces import NotEnoughSharesError, NoSharesError
5 from allmydata.util import log
6 from allmydata.util.dictutil import DictOfSets
7 from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \
11 """I am responsible for acquiring blocks for a single segment. I will use
12 the Share instances passed to my add_shares() method to locate, retrieve,
13 and validate those blocks. I expect my parent node to call my
14 no_more_shares() method when there are no more shares available. I will
15 call my parent's want_more_shares() method when I want more: I expect to
16 see at least one call to add_shares or no_more_shares afterwards.
18 When I have enough validated blocks, I will call my parent's
19 process_blocks() method with a dictionary that maps shnum to blockdata.
20 If I am unable to provide enough blocks, I will call my parent's
21 fetch_failed() method with (self, f). After either of these events, I
22 will shut down and do no further work. My parent can also call my stop()
23 method to have me shut down early."""
25 def __init__(self, node, segnum, k, logparent):
26 self._node = node # _Node
29 self._shares = [] # unused Share instances, sorted by "goodness"
30 # (RTT), then shnum. This is populated when DYHB
31 # responses arrive, or (for later segments) at
32 # startup. We remove shares from it when we call
33 # sh.get_block() on them.
34 self._shares_from_server = DictOfSets() # maps serverid to set of
35 # Shares on that server for
36 # which we have outstanding
38 self._max_shares_per_server = 1 # how many Shares we're allowed to
39 # pull from each server. This starts
40 # at 1 and grows if we don't have
41 # sufficient diversity.
42 self._active_share_map = {} # maps shnum to outstanding (and not
43 # OVERDUE) Share that provides it.
44 self._overdue_share_map = DictOfSets() # shares in the OVERDUE state
46 self._share_observers = {} # maps Share to EventStreamObserver for
48 self._blocks = {} # maps shnum to validated block data
49 self._no_more_shares = False
50 self._last_failure = None
54 log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix,
55 level=log.NOISY, parent=self._lp, umid="LWyqpg")
56 self._cancel_all_requests()
59 del self._shares, self._shares_from_server, self._active_share_map
60 del self._share_observers
63 # called by our parent _Node
65 def add_shares(self, shares):
66 # called when ShareFinder locates a new share, and when a non-initial
67 # segment fetch is started and we already know about shares from the
69 self._shares.extend(shares)
70 self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) )
73 def no_more_shares(self):
74 # ShareFinder tells us it's reached the end of its list
75 self._no_more_shares = True
82 # if any exception occurs here, kill the download
85 self._node.fetch_failed(self, Failure())
92 numsegs, authoritative = self._node.get_num_segments()
93 if authoritative and self.segnum >= numsegs:
94 # oops, we were asking for a segment number beyond the end of the
95 # file. This is an error.
97 e = BadSegmentNumberError("segnum=%d, numsegs=%d" %
98 (self.segnum, self._node.num_segments))
100 self._node.fetch_failed(self, f)
103 #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares
104 # Should we sent out more requests?
105 while len(set(self._blocks.keys())
106 | set(self._active_share_map.keys())
108 # we don't have data or active requests for enough shares. Are
109 # there any unused shares we can start using?
110 (sent_something, want_more_diversity) = self._find_and_use_share()
112 # great. loop back around in case we need to send more.
114 if want_more_diversity:
115 # we could have sent something if we'd been allowed to pull
116 # more shares per server. Increase the limit and try again.
117 self._max_shares_per_server += 1
118 log.msg("SegmentFetcher(%s) increasing diversity limit to %d"
119 % (self._node._si_prefix, self._max_shares_per_server),
120 level=log.NOISY, umid="xY2pBA")
121 # Also ask for more shares, in the hopes of achieving better
122 # diversity for the next segment.
123 self._ask_for_more_shares()
125 # we need more shares than the ones in self._shares to make
127 self._ask_for_more_shares()
128 if self._no_more_shares:
129 # But there are no more shares to be had. If we're going to
130 # succeed, it will be with the shares we've already seen.
131 # Will they be enough?
132 if len(set(self._blocks.keys())
133 | set(self._active_share_map.keys())
134 | set(self._overdue_share_map.keys())
137 self._no_shares_error() # this calls self.stop()
139 # our outstanding or overdue requests may yet work.
140 # more shares may be coming. Wait until then.
144 if len(set(self._blocks.keys())) >= k:
147 self._node.process_blocks(self.segnum, self._blocks)
150 def _no_shares_error(self):
151 if not (self._shares or self._active_share_map or
152 self._overdue_share_map or self._blocks):
153 format = ("no shares (need %(k)d)."
154 " Last failure: %(last_failure)s")
155 args = { "k": self._k,
156 "last_failure": self._last_failure }
157 error = NoSharesError
159 format = ("ran out of shares: complete=%(complete)s"
160 " pending=%(pending)s overdue=%(overdue)s"
161 " unused=%(unused)s need %(k)d."
162 " Last failure: %(last_failure)s")
163 def join(shnums): return ",".join(["sh%d" % shnum
164 for shnum in sorted(shnums)])
165 pending_s = ",".join([str(sh)
166 for sh in self._active_share_map.values()])
168 for shares in self._overdue_share_map.values():
170 overdue_s = ",".join([str(sh) for sh in overdue])
171 args = {"complete": join(self._blocks.keys()),
172 "pending": pending_s,
173 "overdue": overdue_s,
174 # 'unused' should be zero
175 "unused": ",".join([str(sh) for sh in self._shares]),
177 "last_failure": self._last_failure,
179 error = NotEnoughSharesError
180 log.msg(format=format,
181 level=log.UNUSUAL, parent=self._lp, umid="1DsnTg",
183 e = error(format % args)
186 self._node.fetch_failed(self, f)
188 def _find_and_use_share(self):
189 sent_something = False
190 want_more_diversity = False
191 for sh in self._shares: # find one good share to fetch
192 shnum = sh._shnum ; serverid = sh._peerid
193 if shnum in self._blocks:
194 continue # don't request data we already have
195 if shnum in self._active_share_map:
196 # note: OVERDUE shares are removed from _active_share_map
197 # and added to _overdue_share_map instead.
198 continue # don't send redundant requests
199 sfs = self._shares_from_server
200 if len(sfs.get(serverid,set())) >= self._max_shares_per_server:
201 # don't pull too much from a single server
202 want_more_diversity = True
204 # ok, we can use this share
205 self._shares.remove(sh)
206 self._active_share_map[shnum] = sh
207 self._shares_from_server.add(serverid, sh)
208 self._start_share(sh, shnum)
209 sent_something = True
211 return (sent_something, want_more_diversity)
213 def _start_share(self, share, shnum):
214 self._share_observers[share] = o = share.get_block(self.segnum)
215 o.subscribe(self._block_request_activity, share=share, shnum=shnum)
217 def _ask_for_more_shares(self):
218 if not self._no_more_shares:
219 self._node.want_more_shares()
220 # that will trigger the ShareFinder to keep looking, and call our
221 # add_shares() or no_more_shares() later.
223 def _cancel_all_requests(self):
224 for o in self._share_observers.values():
226 self._share_observers = {}
228 def _block_request_activity(self, share, shnum, state, block=None, f=None):
229 # called by Shares, in response to our s.send_request() calls.
230 if not self._running:
232 log.msg("SegmentFetcher(%s)._block_request_activity:"
233 " Share(sh%d-on-%s) -> %s" %
234 (self._node._si_prefix, shnum, share._peerid_s, state),
235 level=log.NOISY, parent=self._lp, umid="vilNWA")
236 # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share
237 # from all our tracking lists.
238 if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
239 self._share_observers.pop(share, None)
240 self._shares_from_server.discard(shnum, share)
241 if self._active_share_map.get(shnum) is share:
242 del self._active_share_map[shnum]
243 self._overdue_share_map.discard(shnum, share)
245 if state is COMPLETE:
246 # 'block' is fully validated and complete
247 self._blocks[shnum] = block
250 # no longer active, but still might complete
251 del self._active_share_map[shnum]
252 self._overdue_share_map.add(shnum, share)
253 # OVERDUE is not terminal: it will eventually transition to
254 # COMPLETE, CORRUPT, or DEAD.
257 self._last_failure = f
258 if state is BADSEGNUM:
259 # our main loop will ask the DownloadNode each time for the
260 # number of segments, so we'll deal with this in the top of
264 eventually(self.loop)