]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/retrieve.py
Retrieve._activate_enough_peers: rewrite Verify logic
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
index 100350a0b62b00925bfae12dddd70b56649665e6..fc113695231bcefd5179a70f043cfaa81deba565 100644 (file)
@@ -7,7 +7,7 @@ from twisted.python import failure
 from twisted.internet.interfaces import IPushProducer, IConsumer
 from foolscap.api import eventually, fireEventually
 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
-                                 MDMF_VERSION, SDMF_VERSION
+     DownloadStopped, MDMF_VERSION, SDMF_VERSION
 from allmydata.util import hashutil, log, mathutil
 from allmydata.util.dictutil import DictOfSets
 from allmydata import hashtree, codec
@@ -143,6 +143,7 @@ class Retrieve:
         self._status.set_size(datalength)
         self._status.set_encoding(k, N)
         self.readers = {}
+        self._stopped = False
         self._pause_deferred = None
         self._offset = None
         self._read_length = None
@@ -196,6 +197,10 @@ class Retrieve:
 
         eventually(p.callback, None)
 
+    def stopProducing(self):
+        self._stopped = True
+        self.resumeProducing()
+
 
     def _check_for_paused(self, res):
         """
@@ -205,6 +210,8 @@ class Retrieve:
         the Deferred fires immediately. Otherwise, the Deferred fires
         when the downloader is unpaused.
         """
+        if self._stopped:
+            raise DownloadStopped("our Consumer called stopProducing()")
         if self._pause_deferred is not None:
             d = defer.Deferred()
             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
@@ -453,32 +460,29 @@ class Retrieve:
         #  instead of just reasoning about what the effect might be. Out
         #  of scope for MDMF, though.)
 
-        # We need at least self._required_shares readers to download a
-        # segment. If we're verifying, we need all shares.
-        if self._verify:
-            needed = self._total_shares
-        else:
-            needed = self._required_shares
         # XXX: Why don't format= log messages work here?
-        self.log("adding %d peers to the active peers list" % needed)
-
-        if len(self._active_readers) >= needed:
-            # enough shares are active
-            return
 
-        more = needed - len(self._active_readers)
         known_shnums = set(self.remaining_sharemap.keys())
         used_shnums = set([r.shnum for r in self._active_readers])
         unused_shnums = known_shnums - used_shnums
-        # We favor lower numbered shares, since FEC is faster with
-        # primary shares than with other shares, and lower-numbered
-        # shares are more likely to be primary than higher numbered
-        # shares.
-        new_shnums = sorted(unused_shnums)[:more]
-        if len(new_shnums) < more and not self._verify:
-            # We don't have enough readers to retrieve the file; fail.
-            self._raise_notenoughshareserror()
 
+        if self._verify:
+            new_shnums = unused_shnums # use them all
+        elif len(self._active_readers) < self._required_shares:
+            # need more shares
+            more = self._required_shares - len(self._active_readers)
+            # We favor lower numbered shares, since FEC is faster with
+            # primary shares than with other shares, and lower-numbered
+            # shares are more likely to be primary than higher numbered
+            # shares.
+            new_shnums = sorted(unused_shnums)[:more]
+            if len(new_shnums) < more:
+                # We don't have enough readers to retrieve the file; fail.
+                self._raise_notenoughshareserror()
+        else:
+            new_shnums = []
+
+        self.log("adding %d new peers to the active list" % len(new_shnums))
         for shnum in new_shnums:
             reader = self.readers[shnum]
             self._active_readers.append(reader)