]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/fetcher.py
SegmentFetcher: use new diversity-seeking share-selection algorithm, and
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / fetcher.py
1
2 from twisted.python.failure import Failure
3 from foolscap.api import eventually
4 from allmydata.interfaces import NotEnoughSharesError, NoSharesError
5 from allmydata.util import log
6 from allmydata.util.dictutil import DictOfSets
7 from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \
8      BadSegmentNumberError
9
10 class SegmentFetcher:
11     """I am responsible for acquiring blocks for a single segment. I will use
12     the Share instances passed to my add_shares() method to locate, retrieve,
13     and validate those blocks. I expect my parent node to call my
14     no_more_shares() method when there are no more shares available. I will
15     call my parent's want_more_shares() method when I want more: I expect to
16     see at least one call to add_shares or no_more_shares afterwards.
17
18     When I have enough validated blocks, I will call my parent's
19     process_blocks() method with a dictionary that maps shnum to blockdata.
20     If I am unable to provide enough blocks, I will call my parent's
21     fetch_failed() method with (self, f). After either of these events, I
22     will shut down and do no further work. My parent can also call my stop()
23     method to have me shut down early."""
24
25     def __init__(self, node, segnum, k, logparent):
26         self._node = node # _Node
27         self.segnum = segnum
28         self._k = k
29         self._shares = [] # unused Share instances, sorted by "goodness"
30                           # (RTT), then shnum. This is populated when DYHB
31                           # responses arrive, or (for later segments) at
32                           # startup. We remove shares from it when we call
33                           # sh.get_block() on them.
34         self._shares_from_server = DictOfSets() # maps serverid to set of
35                                                 # Shares on that server for
36                                                 # which we have outstanding
37                                                 # get_block() calls.
38         self._max_shares_per_server = 1 # how many Shares we're allowed to
39                                         # pull from each server. This starts
40                                         # at 1 and grows if we don't have
41                                         # sufficient diversity.
42         self._active_share_map = {} # maps shnum to outstanding (and not
43                                     # OVERDUE) Share that provides it.
44         self._overdue_share_map = DictOfSets() # shares in the OVERDUE state
45         self._lp = logparent
46         self._share_observers = {} # maps Share to EventStreamObserver for
47                                    # active ones
48         self._blocks = {} # maps shnum to validated block data
49         self._no_more_shares = False
50         self._last_failure = None
51         self._running = True
52
53     def stop(self):
54         log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix,
55                 level=log.NOISY, parent=self._lp, umid="LWyqpg")
56         self._cancel_all_requests()
57         self._running = False
58         # help GC ??? XXX
59         del self._shares, self._shares_from_server, self._active_share_map
60         del self._share_observers
61
62
63     # called by our parent _Node
64
65     def add_shares(self, shares):
66         # called when ShareFinder locates a new share, and when a non-initial
67         # segment fetch is started and we already know about shares from the
68         # previous segment
69         self._shares.extend(shares)
70         self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) )
71         eventually(self.loop)
72
73     def no_more_shares(self):
74         # ShareFinder tells us it's reached the end of its list
75         self._no_more_shares = True
76         eventually(self.loop)
77
78     # internal methods
79
80     def loop(self):
81         try:
82             # if any exception occurs here, kill the download
83             self._do_loop()
84         except BaseException:
85             self._node.fetch_failed(self, Failure())
86             raise
87
88     def _do_loop(self):
89         k = self._k
90         if not self._running:
91             return
92         numsegs, authoritative = self._node.get_num_segments()
93         if authoritative and self.segnum >= numsegs:
94             # oops, we were asking for a segment number beyond the end of the
95             # file. This is an error.
96             self.stop()
97             e = BadSegmentNumberError("segnum=%d, numsegs=%d" %
98                                       (self.segnum, self._node.num_segments))
99             f = Failure(e)
100             self._node.fetch_failed(self, f)
101             return
102
103         #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares
104         # Should we sent out more requests?
105         while len(set(self._blocks.keys())
106                   | set(self._active_share_map.keys())
107                   ) < k:
108             # we don't have data or active requests for enough shares. Are
109             # there any unused shares we can start using?
110             (sent_something, want_more_diversity) = self._find_and_use_share()
111             if sent_something:
112                 # great. loop back around in case we need to send more.
113                 continue
114             if want_more_diversity:
115                 # we could have sent something if we'd been allowed to pull
116                 # more shares per server. Increase the limit and try again.
117                 self._max_shares_per_server += 1
118                 log.msg("SegmentFetcher(%s) increasing diversity limit to %d"
119                         % (self._node._si_prefix, self._max_shares_per_server),
120                         level=log.NOISY, umid="xY2pBA")
121                 # Also ask for more shares, in the hopes of achieving better
122                 # diversity for the next segment.
123                 self._ask_for_more_shares()
124                 continue
125             # we need more shares than the ones in self._shares to make
126             # progress
127             self._ask_for_more_shares()
128             if self._no_more_shares:
129                 # But there are no more shares to be had. If we're going to
130                 # succeed, it will be with the shares we've already seen.
131                 # Will they be enough?
132                 if len(set(self._blocks.keys())
133                        | set(self._active_share_map.keys())
134                        | set(self._overdue_share_map.keys())
135                        ) < k:
136                     # nope. bail.
137                     self._no_shares_error() # this calls self.stop()
138                     return
139                 # our outstanding or overdue requests may yet work.
140             # more shares may be coming. Wait until then.
141             return
142
143         # are we done?
144         if len(set(self._blocks.keys())) >= k:
145             # yay!
146             self.stop()
147             self._node.process_blocks(self.segnum, self._blocks)
148             return
149
150     def _no_shares_error(self):
151         if not (self._shares or self._active_share_map or
152                 self._overdue_share_map or self._blocks):
153             format = ("no shares (need %(k)d)."
154                       " Last failure: %(last_failure)s")
155             args = { "k": self._k,
156                      "last_failure": self._last_failure }
157             error = NoSharesError
158         else:
159             format = ("ran out of shares: complete=%(complete)s"
160                       " pending=%(pending)s overdue=%(overdue)s"
161                       " unused=%(unused)s need %(k)d."
162                       " Last failure: %(last_failure)s")
163             def join(shnums): return ",".join(["sh%d" % shnum
164                                                for shnum in sorted(shnums)])
165             pending_s = ",".join([str(sh)
166                                   for sh in self._active_share_map.values()])
167             overdue = set()
168             for shares in self._overdue_share_map.values():
169                 overdue |= shares
170             overdue_s = ",".join([str(sh) for sh in overdue])
171             args = {"complete": join(self._blocks.keys()),
172                     "pending": pending_s,
173                     "overdue": overdue_s,
174                     # 'unused' should be zero
175                     "unused": ",".join([str(sh) for sh in self._shares]),
176                     "k": self._k,
177                     "last_failure": self._last_failure,
178                     }
179             error = NotEnoughSharesError
180         log.msg(format=format,
181                 level=log.UNUSUAL, parent=self._lp, umid="1DsnTg",
182                 **args)
183         e = error(format % args)
184         f = Failure(e)
185         self.stop()
186         self._node.fetch_failed(self, f)
187
188     def _find_and_use_share(self):
189         sent_something = False
190         want_more_diversity = False
191         for sh in self._shares: # find one good share to fetch
192             shnum = sh._shnum ; serverid = sh._peerid
193             if shnum in self._blocks:
194                 continue # don't request data we already have
195             if shnum in self._active_share_map:
196                 # note: OVERDUE shares are removed from _active_share_map
197                 # and added to _overdue_share_map instead.
198                 continue # don't send redundant requests
199             sfs = self._shares_from_server
200             if len(sfs.get(serverid,set())) >= self._max_shares_per_server:
201                 # don't pull too much from a single server
202                 want_more_diversity = True
203                 continue
204             # ok, we can use this share
205             self._shares.remove(sh)
206             self._active_share_map[shnum] = sh
207             self._shares_from_server.add(serverid, sh)
208             self._start_share(sh, shnum)
209             sent_something = True
210             break
211         return (sent_something, want_more_diversity)
212
213     def _start_share(self, share, shnum):
214         self._share_observers[share] = o = share.get_block(self.segnum)
215         o.subscribe(self._block_request_activity, share=share, shnum=shnum)
216
217     def _ask_for_more_shares(self):
218         if not self._no_more_shares:
219             self._node.want_more_shares()
220             # that will trigger the ShareFinder to keep looking, and call our
221             # add_shares() or no_more_shares() later.
222
223     def _cancel_all_requests(self):
224         for o in self._share_observers.values():
225             o.cancel()
226         self._share_observers = {}
227
228     def _block_request_activity(self, share, shnum, state, block=None, f=None):
229         # called by Shares, in response to our s.send_request() calls.
230         if not self._running:
231             return
232         log.msg("SegmentFetcher(%s)._block_request_activity:"
233                 " Share(sh%d-on-%s) -> %s" %
234                 (self._node._si_prefix, shnum, share._peerid_s, state),
235                 level=log.NOISY, parent=self._lp, umid="vilNWA")
236         # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share
237         # from all our tracking lists.
238         if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
239             self._share_observers.pop(share, None)
240             self._shares_from_server.discard(shnum, share)
241             if self._active_share_map.get(shnum) is share:
242                 del self._active_share_map[shnum]
243             self._overdue_share_map.discard(shnum, share)
244
245         if state is COMPLETE:
246             # 'block' is fully validated and complete
247             self._blocks[shnum] = block
248
249         if state is OVERDUE:
250             # no longer active, but still might complete
251             del self._active_share_map[shnum]
252             self._overdue_share_map.add(shnum, share)
253             # OVERDUE is not terminal: it will eventually transition to
254             # COMPLETE, CORRUPT, or DEAD.
255
256         if state is DEAD:
257             self._last_failure = f
258         if state is BADSEGNUM:
259             # our main loop will ask the DownloadNode each time for the
260             # number of segments, so we'll deal with this in the top of
261             # _do_loop
262             pass
263
264         eventually(self.loop)