From df07060f9377bff85a094017b5cbc46d948b9c93 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Mon, 5 Sep 2011 11:38:44 -0700
Subject: [PATCH] Retrieve: rewrite flow-control: use a top-level loop() to
 catch all errors

This ought to close the potential for dropped errors and hanging downloads.
Verify needs to be examined, I may have broken it, although all tests pass.
---
 src/allmydata/mutable/retrieve.py | 157 +++++++++++-------------------
 1 file changed, 58 insertions(+), 99 deletions(-)

diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py
index cb4ba352..865b0e10 100644
--- a/src/allmydata/mutable/retrieve.py
+++ b/src/allmydata/mutable/retrieve.py
@@ -228,8 +228,6 @@ class Retrieve:
         self._setup_encoding_parameters()
         self.log("starting download")
         self._started_fetching = time.time()
-        d = self._add_active_peers()
-        # ...
         # The download process beyond this is a state machine.
         # _add_active_peers will select the peers that we want to use
         # for the download, and then attempt to start downloading. After
@@ -238,8 +236,17 @@ class Retrieve:
         # peers before downloading all of the segments, _done_deferred
         # will errback.  Otherwise, it will eventually callback with the
         # contents of the mutable file.
+        self.loop()
         return self._done_deferred
 
+    def loop(self):
+        d = fireEventually(None) # avoid #237 recursion limit problem
+        d.addCallback(lambda ign: self._activate_enough_peers())
+        d.addCallback(lambda ign: self._download_current_segment())
+        # when we're done, _download_current_segment will call _done. If we
+        # aren't, it will call loop() again.
+        d.addErrback(self._error)
+
     def _setup_download(self):
         self._started = time.time()
         self._status.set_status("Retrieving Shares")
@@ -421,7 +428,7 @@ class Retrieve:
 
         self._current_segment = self._start_segment
 
-    def _add_active_peers(self):
+    def _activate_enough_peers(self):
         """
         I populate self._active_readers with enough active readers to
         retrieve the contents of this mutable file. I am called before
@@ -449,35 +456,36 @@ class Retrieve:
         #  of scope for MDMF, though.)
 
         # We need at least self._required_shares readers to download a
-        # segment.
+        # segment. If we're verifying, we need all shares.
         if self._verify:
             needed = self._total_shares
         else:
-            needed = self._required_shares - len(self._active_readers)
+            needed = self._required_shares
         # XXX: Why don't format= log messages work here?
         self.log("adding %d peers to the active peers list" % needed)
 
+        if len(self._active_readers) >= needed:
+            # enough shares are active
+            return
+
+        more = needed - len(self._active_readers)
+        known_shnums = set(self.remaining_sharemap.keys())
+        used_shnums = set([r.shnum for r in self._active_readers])
+        unused_shnums = known_shnums - used_shnums
         # We favor lower numbered shares, since FEC is faster with
         # primary shares than with other shares, and lower-numbered
         # shares are more likely to be primary than higher numbered
         # shares.
-        active_shnums = set(sorted(self.remaining_sharemap.keys()))
-        # We shouldn't consider adding shares that we already have; this
-        # will cause problems later.
-        active_shnums -= set([reader.shnum for reader in self._active_readers])
-        active_shnums = list(active_shnums)[:needed]
-        if len(active_shnums) < needed and not self._verify:
+        new_shnums = sorted(unused_shnums)[:more]
+        if len(new_shnums) < more and not self._verify:
             # We don't have enough readers to retrieve the file; fail.
-            return self._failed()
+            self._raise_notenoughshareserror()
 
-        for shnum in active_shnums:
-            self._active_readers.append(self.readers[shnum])
-            self.log("added reader for share %d" % shnum)
-        assert len(self._active_readers) >= self._required_shares
-        new_readers = set(self._active_readers) - self._validated_readers
-
-        for reader in new_readers:
+        for shnum in new_shnums:
+            reader = self.readers[shnum]
+            self._active_readers.append(reader)
             self._validated_readers.add(reader)
+            self.log("added reader for share %d" % shnum)
             # Each time we validate a reader, we check to see if we need the
             # private key. If we do, we politely ask for it and then continue
             # computing. If we find that we haven't gotten it at the end of
@@ -487,8 +495,7 @@ class Retrieve:
                 d.addCallback(self._try_to_validate_privkey, reader)
                 # XXX: don't just drop the Deferred. We need error-reporting
                 # but not flow-control here.
-        return self._download_current_segment()
-
+        assert len(self._active_readers) >= self._required_shares
 
     def _try_to_validate_prefix(self, prefix, reader):
         """
@@ -588,23 +595,16 @@ class Retrieve:
         that this Retrieve is currently responsible for downloading.
         """
         assert len(self._active_readers) >= self._required_shares
-        if self._current_segment <= self._last_segment:
-            d = self._process_segment(self._current_segment)
-        else:
-            d = defer.succeed(None)
-        d.addBoth(self._turn_barrier)
-        d.addCallback(self._check_for_done)
+        if self._current_segment > self._last_segment:
+            # No more segments to download, we're done.
+            self.log("got plaintext, done")
+            return self._done()
+        self.log("on segment %d of %d" %
+                 (self._current_segment + 1, self._num_segments))
+        d = self._process_segment(self._current_segment)
+        d.addCallback(lambda ign: self.loop())
         return d
 
-
-    def _turn_barrier(self, result):
-        """
-        I help the download process avoid the recursion limit issues
-        discussed in #237.
-        """
-        return fireEventually(result)
-
-
     def _process_segment(self, segnum):
         """
         I download, validate, decode, and decrypt one segment of the
@@ -958,50 +958,11 @@ class Retrieve:
         self._need_privkey = False
 
 
-    def _check_for_done(self, res):
-        """
-        I check to see if this Retrieve object has successfully finished
-        its work.
-
-        I can exit in the following ways:
-            - If there are no more segments to download, then I exit by
-              causing self._done_deferred to fire with the plaintext
-              content requested by the caller.
-            - If there are still segments to be downloaded, and there
-              are enough active readers (readers which have not broken
-              and have not given us corrupt data) to continue
-              downloading, I send control back to
-              _download_current_segment.
-            - If there are still segments to be downloaded but there are
-              not enough active peers to download them, I ask
-              _add_active_peers to add more peers. If it is successful,
-              it will call _download_current_segment. If there are not
-              enough peers to retrieve the file, then that will cause
-              _done_deferred to errback.
-        """
-        self.log("checking for doneness")
-        if self._current_segment > self._last_segment:
-            # No more segments to download, we're done.
-            self.log("got plaintext, done")
-            return self._done()
-
-        if len(self._active_readers) >= self._required_shares:
-            # More segments to download, but we have enough good peers
-            # in self._active_readers that we can do that without issue,
-            # so go nab the next segment.
-            self.log("not done yet: on segment %d of %d" % \
-                     (self._current_segment + 1, self._num_segments))
-            return self._download_current_segment()
-
-        self.log("not done yet: on segment %d of %d, need to add peers" % \
-                 (self._current_segment + 1, self._num_segments))
-        return self._add_active_peers()
-
 
     def _done(self):
         """
-        I am called by _check_for_done when the download process has
-        finished successfully. After making some useful logging
+        I am called by _download_current_segment when the download process
+        has finished successfully. After making some useful logging
         statements, I return the decrypted contents to the owner of this
         Retrieve object through self._done_deferred.
         """
@@ -1029,36 +990,34 @@ class Retrieve:
         eventually(self._done_deferred.callback, ret)
 
 
-    def _failed(self):
+    def _raise_notenoughshareserror(self):
         """
-        I am called by _add_active_peers when there are not enough
+        I am called by _activate_enough_peers when there are not enough
         active peers left to complete the download. After making some
-        useful logging statements, I return an exception to that effect
+        useful logging statements, I throw an exception to that effect
         to the caller of this Retrieve object through
         self._done_deferred.
         """
+
+        format = ("ran out of peers: "
+                  "have %(have)d of %(total)d segments "
+                  "found %(bad)d bad shares "
+                  "encoding %(k)d-of-%(n)d")
+        args = {"have": self._current_segment,
+                "total": self._num_segments,
+                "need": self._last_segment,
+                "k": self._required_shares,
+                "n": self._total_shares,
+                "bad": len(self._bad_shares)}
+        raise NotEnoughSharesError("%s, last failure: %s" %
+                                   (format % args, str(self._last_failure)))
+
+    def _error(self, f):
+        # all errors, including NotEnoughSharesError, land here
         self._running = False
         self._status.set_active(False)
         now = time.time()
         self._status.timings['total'] = now - self._started
         self._status.timings['fetch'] = now - self._started_fetching
         self._status.set_status("Failed")
-
-        if self._verify:
-            ret = list(self._bad_shares)
-        else:
-            format = ("ran out of peers: "
-                      "have %(have)d of %(total)d segments "
-                      "found %(bad)d bad shares "
-                      "encoding %(k)d-of-%(n)d")
-            args = {"have": self._current_segment,
-                    "total": self._num_segments,
-                    "need": self._last_segment,
-                    "k": self._required_shares,
-                    "n": self._total_shares,
-                    "bad": len(self._bad_shares)}
-            e = NotEnoughSharesError("%s, last failure: %s" % \
-                                     (format % args, str(self._last_failure)))
-            f = failure.Failure(e)
-            ret = f
-        eventually(self._done_deferred.callback, ret)
+        eventually(self._done_deferred.errback, f)
-- 
2.45.2