From: Brian Warner Date: Mon, 5 Sep 2011 18:38:44 +0000 (-0700) Subject: Retrieve: rewrite flow-control: use a top-level loop() to catch all errors X-Git-Tag: allmydata-tahoe-1.9.0a2~53 X-Git-Url: https://git.rkrishnan.org/specifications/%5B/%5D%20/uri/status?a=commitdiff_plain;h=df07060f9377bff85a094017b5cbc46d948b9c93;p=tahoe-lafs%2Ftahoe-lafs.git Retrieve: rewrite flow-control: use a top-level loop() to catch all errors This ought to close the potential for dropped errors and hanging downloads. Verify needs to be examined, I may have broken it, although all tests pass. --- diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index cb4ba352..865b0e10 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -228,8 +228,6 @@ class Retrieve: self._setup_encoding_parameters() self.log("starting download") self._started_fetching = time.time() - d = self._add_active_peers() - # ... # The download process beyond this is a state machine. # _add_active_peers will select the peers that we want to use # for the download, and then attempt to start downloading. After @@ -238,8 +236,17 @@ class Retrieve: # peers before downloading all of the segments, _done_deferred # will errback. Otherwise, it will eventually callback with the # contents of the mutable file. + self.loop() return self._done_deferred + def loop(self): + d = fireEventually(None) # avoid #237 recursion limit problem + d.addCallback(lambda ign: self._activate_enough_peers()) + d.addCallback(lambda ign: self._download_current_segment()) + # when we're done, _download_current_segment will call _done. If we + # aren't, it will call loop() again. + d.addErrback(self._error) + def _setup_download(self): self._started = time.time() self._status.set_status("Retrieving Shares") @@ -421,7 +428,7 @@ class Retrieve: self._current_segment = self._start_segment - def _add_active_peers(self): + def _activate_enough_peers(self): """ I populate self._active_readers with enough active readers to retrieve the contents of this mutable file. I am called before @@ -449,35 +456,36 @@ class Retrieve: # of scope for MDMF, though.) # We need at least self._required_shares readers to download a - # segment. + # segment. If we're verifying, we need all shares. if self._verify: needed = self._total_shares else: - needed = self._required_shares - len(self._active_readers) + needed = self._required_shares # XXX: Why don't format= log messages work here? self.log("adding %d peers to the active peers list" % needed) + if len(self._active_readers) >= needed: + # enough shares are active + return + + more = needed - len(self._active_readers) + known_shnums = set(self.remaining_sharemap.keys()) + used_shnums = set([r.shnum for r in self._active_readers]) + unused_shnums = known_shnums - used_shnums # We favor lower numbered shares, since FEC is faster with # primary shares than with other shares, and lower-numbered # shares are more likely to be primary than higher numbered # shares. - active_shnums = set(sorted(self.remaining_sharemap.keys())) - # We shouldn't consider adding shares that we already have; this - # will cause problems later. - active_shnums -= set([reader.shnum for reader in self._active_readers]) - active_shnums = list(active_shnums)[:needed] - if len(active_shnums) < needed and not self._verify: + new_shnums = sorted(unused_shnums)[:more] + if len(new_shnums) < more and not self._verify: # We don't have enough readers to retrieve the file; fail. - return self._failed() + self._raise_notenoughshareserror() - for shnum in active_shnums: - self._active_readers.append(self.readers[shnum]) - self.log("added reader for share %d" % shnum) - assert len(self._active_readers) >= self._required_shares - new_readers = set(self._active_readers) - self._validated_readers - - for reader in new_readers: + for shnum in new_shnums: + reader = self.readers[shnum] + self._active_readers.append(reader) self._validated_readers.add(reader) + self.log("added reader for share %d" % shnum) # Each time we validate a reader, we check to see if we need the # private key. If we do, we politely ask for it and then continue # computing. If we find that we haven't gotten it at the end of @@ -487,8 +495,7 @@ class Retrieve: d.addCallback(self._try_to_validate_privkey, reader) # XXX: don't just drop the Deferred. We need error-reporting # but not flow-control here. - return self._download_current_segment() - + assert len(self._active_readers) >= self._required_shares def _try_to_validate_prefix(self, prefix, reader): """ @@ -588,23 +595,16 @@ class Retrieve: that this Retrieve is currently responsible for downloading. """ assert len(self._active_readers) >= self._required_shares - if self._current_segment <= self._last_segment: - d = self._process_segment(self._current_segment) - else: - d = defer.succeed(None) - d.addBoth(self._turn_barrier) - d.addCallback(self._check_for_done) + if self._current_segment > self._last_segment: + # No more segments to download, we're done. + self.log("got plaintext, done") + return self._done() + self.log("on segment %d of %d" % + (self._current_segment + 1, self._num_segments)) + d = self._process_segment(self._current_segment) + d.addCallback(lambda ign: self.loop()) return d - - def _turn_barrier(self, result): - """ - I help the download process avoid the recursion limit issues - discussed in #237. - """ - return fireEventually(result) - - def _process_segment(self, segnum): """ I download, validate, decode, and decrypt one segment of the @@ -958,50 +958,11 @@ class Retrieve: self._need_privkey = False - def _check_for_done(self, res): - """ - I check to see if this Retrieve object has successfully finished - its work. - - I can exit in the following ways: - - If there are no more segments to download, then I exit by - causing self._done_deferred to fire with the plaintext - content requested by the caller. - - If there are still segments to be downloaded, and there - are enough active readers (readers which have not broken - and have not given us corrupt data) to continue - downloading, I send control back to - _download_current_segment. - - If there are still segments to be downloaded but there are - not enough active peers to download them, I ask - _add_active_peers to add more peers. If it is successful, - it will call _download_current_segment. If there are not - enough peers to retrieve the file, then that will cause - _done_deferred to errback. - """ - self.log("checking for doneness") - if self._current_segment > self._last_segment: - # No more segments to download, we're done. - self.log("got plaintext, done") - return self._done() - - if len(self._active_readers) >= self._required_shares: - # More segments to download, but we have enough good peers - # in self._active_readers that we can do that without issue, - # so go nab the next segment. - self.log("not done yet: on segment %d of %d" % \ - (self._current_segment + 1, self._num_segments)) - return self._download_current_segment() - - self.log("not done yet: on segment %d of %d, need to add peers" % \ - (self._current_segment + 1, self._num_segments)) - return self._add_active_peers() - def _done(self): """ - I am called by _check_for_done when the download process has - finished successfully. After making some useful logging + I am called by _download_current_segment when the download process + has finished successfully. After making some useful logging statements, I return the decrypted contents to the owner of this Retrieve object through self._done_deferred. """ @@ -1029,36 +990,34 @@ class Retrieve: eventually(self._done_deferred.callback, ret) - def _failed(self): + def _raise_notenoughshareserror(self): """ - I am called by _add_active_peers when there are not enough + I am called by _activate_enough_peers when there are not enough active peers left to complete the download. After making some - useful logging statements, I return an exception to that effect + useful logging statements, I throw an exception to that effect to the caller of this Retrieve object through self._done_deferred. """ + + format = ("ran out of peers: " + "have %(have)d of %(total)d segments " + "found %(bad)d bad shares " + "encoding %(k)d-of-%(n)d") + args = {"have": self._current_segment, + "total": self._num_segments, + "need": self._last_segment, + "k": self._required_shares, + "n": self._total_shares, + "bad": len(self._bad_shares)} + raise NotEnoughSharesError("%s, last failure: %s" % + (format % args, str(self._last_failure))) + + def _error(self, f): + # all errors, including NotEnoughSharesError, land here self._running = False self._status.set_active(False) now = time.time() self._status.timings['total'] = now - self._started self._status.timings['fetch'] = now - self._started_fetching self._status.set_status("Failed") - - if self._verify: - ret = list(self._bad_shares) - else: - format = ("ran out of peers: " - "have %(have)d of %(total)d segments " - "found %(bad)d bad shares " - "encoding %(k)d-of-%(n)d") - args = {"have": self._current_segment, - "total": self._num_segments, - "need": self._last_segment, - "k": self._required_shares, - "n": self._total_shares, - "bad": len(self._bad_shares)} - e = NotEnoughSharesError("%s, last failure: %s" % \ - (format % args, str(self._last_failure))) - f = failure.Failure(e) - ret = f - eventually(self._done_deferred.callback, ret) + eventually(self._done_deferred.errback, f)