From 2bd9dfa5bdacb381fa94a4a538a404df9f30adcc Mon Sep 17 00:00:00 2001
From: Zooko O'Whielacronx <zooko@zooko.com>
Date: Wed, 27 Jan 2010 15:34:17 -0800
Subject: [PATCH] immutable: download from the first servers which provide at
 least K buckets instead of waiting for all servers to reply This should put
 an end to the phenomenon I've been seeing that a single hung server can cause
 all downloads on a grid to hang.  Also it should speed up all downloads by
 (a) not-waiting for responses to queries that it doesn't need, and (b)
 downloading shares from the servers which answered the initial query the
 fastest. Also, do not count how many buckets you've gotten when deciding
 whether the download has enough shares or not -- instead count how many
 buckets to *unique* shares that you've gotten.  This appears to improve a
 slightly weird behavior in the current download code in which receiving >= K
 different buckets all to the same sharenumber would make it think it had
 enough to download the file when in fact it hadn't. This patch needs tests
 before it is actually ready for trunk.

---
 src/allmydata/immutable/download.py | 62 ++++++++++++++++++++++-------
 1 file changed, 47 insertions(+), 15 deletions(-)

diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py
index e087b618..05e126b0 100644
--- a/src/allmydata/immutable/download.py
+++ b/src/allmydata/immutable/download.py
@@ -788,7 +788,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         self._opened = False
 
         self.active_buckets = {} # k: shnum, v: bucket
-        self._share_buckets = [] # list of (sharenum, bucket) tuples
+        self._share_buckets = {} # k: sharenum, v: list of buckets
         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
 
         self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
@@ -869,7 +869,18 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         return d
 
     def _get_all_shareholders(self):
-        dl = []
+        """ Once the number of buckets that I know about is >= K then I
+        callback the Deferred that I return.
+
+        If all of the get_buckets deferreds have fired (whether callback or
+        errback) and I still don't have enough buckets then I'll callback the
+        Deferred that I return.
+        """
+        self._wait_for_enough_buckets_d = defer.Deferred()
+
+        self._queries_sent = 0
+        self._responses_received = 0
+        self._queries_failed = 0
         sb = self._storage_broker
         servers = sb.get_servers_for_index(self._storage_index)
         if not servers:
@@ -878,17 +889,15 @@ class CiphertextDownloader(log.PrefixingLogMixin):
             self.log(format="sending DYHB to [%(peerid)s]",
                      peerid=idlib.shortnodeid_b2a(peerid),
                      level=log.NOISY, umid="rT03hg")
+            self._queries_sent += 1
             d = ss.callRemote("get_buckets", self._storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
-            dl.append(d)
-        self._responses_received = 0
-        self._queries_sent = len(dl)
         if self._status:
             self._status.set_status("Locating Shares (%d/%d)" %
                                     (self._responses_received,
                                      self._queries_sent))
-        return defer.DeferredList(dl)
+        return self._wait_for_enough_buckets_d
 
     def _got_response(self, buckets, peerid):
         self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
@@ -906,6 +915,19 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         for sharenum, bucket in buckets.iteritems():
             b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
             self.add_share_bucket(sharenum, b)
+            # If we just got enough buckets for the first time, then fire the
+            # deferred. Then remove it from self so that we don't fire it
+            # again.
+            if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
+                self._wait_for_enough_buckets_d.callback(True)
+                self._wait_for_enough_buckets_d = None
+
+            # Else, if we ran out of outstanding requests then fire it and
+            # remove it from self.
+            assert (self._responses_received+self._queries_failed) <= self._queries_sent
+            if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._queries_sent:
+                self._wait_for_enough_buckets_d.callback(False)
+                self._wait_for_enough_buckets_d = None
 
             if self._results:
                 if peerid not in self._results.servermap:
@@ -914,7 +936,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
     def add_share_bucket(self, sharenum, bucket):
         # this is split out for the benefit of test_encode.py
-        self._share_buckets.append( (sharenum, bucket) )
+        self._share_buckets.setdefault(sharenum, []).append(bucket)
 
     def _got_error(self, f):
         level = log.WEIRD
@@ -922,6 +944,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
             level = log.UNUSUAL
         self.log("Error during get_buckets", failure=f, level=level,
                          umid="3uuBUQ")
+        # If we ran out of outstanding requests then errback it and remove it
+        # from self.
+        self._queries_failed += 1
+        assert (self._responses_received+self._queries_failed) <= self._queries_sent
+        if self._wait_for_enough_buckets_d and self._responses_received == self._queries_sent:
+            self._wait_for_enough_buckets_d.errback()
+            self._wait_for_enough_buckets_d = None
 
     def bucket_failed(self, vbucket):
         shnum = vbucket.sharenum
@@ -964,8 +993,9 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         uri_extension_fetch_started = time.time()
 
         vups = []
-        for sharenum, bucket in self._share_buckets:
-            vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
+        for sharenum, buckets in self._share_buckets.iteritems():
+            for bucket in buckets:
+                vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
         vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
         d = vto.start()
 
@@ -1001,9 +1031,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
     def _get_crypttext_hash_tree(self, res):
         vchtps = []
-        for sharenum, bucket in self._share_buckets:
-            vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
-            vchtps.append(vchtp)
+        for sharenum, buckets in self._share_buckets.iteritems():
+            for bucket in buckets:
+                vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
+                vchtps.append(vchtp)
 
         _get_crypttext_hash_tree_started = time.time()
         if self._status:
@@ -1054,9 +1085,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
 
     def _download_all_segments(self, res):
-        for sharenum, bucket in self._share_buckets:
-            vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
-            self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
+        for sharenum, buckets in self._share_buckets.iteritems():
+            for bucket in buckets:
+                vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
+                self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
 
         # after the above code, self._share_vbuckets contains enough
         # buckets to complete the download, and some extra ones to
-- 
2.45.2