immutable: download from the first servers which provide at least K buckets instead...
authorZooko O'Whielacronx <zooko@zooko.com>
Wed, 27 Jan 2010 23:34:17 +0000 (15:34 -0800)
committerZooko O'Whielacronx <zooko@zooko.com>
Wed, 27 Jan 2010 23:34:17 +0000 (15:34 -0800)
This should put an end to the phenomenon I've been seeing that a single hung server can cause all downloads on a grid to hang.  Also it should speed up all downloads by (a) not-waiting for responses to queries that it doesn't need, and (b) downloading shares from the servers which answered the initial query the fastest.
Also, do not count how many buckets you've gotten when deciding whether the download has enough shares or not -- instead count how many buckets to *unique* shares that you've gotten.  This appears to improve a slightly weird behavior in the current download code in which receiving >= K different buckets all to the same sharenumber would make it think it had enough to download the file when in fact it hadn't.
This patch needs tests before it is actually ready for trunk.

src/allmydata/immutable/download.py

index e087b6181e36471bec4413a4e04d61efe6d99516..05e126b0003ea8b9380fbf960d2c49c3defb33a6 100644 (file)
@@ -788,7 +788,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         self._opened = False
 
         self.active_buckets = {} # k: shnum, v: bucket
-        self._share_buckets = [] # list of (sharenum, bucket) tuples
+        self._share_buckets = {} # k: sharenum, v: list of buckets
         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
 
         self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
@@ -869,7 +869,18 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         return d
 
     def _get_all_shareholders(self):
-        dl = []
+        """ Once the number of buckets that I know about is >= K then I
+        callback the Deferred that I return.
+
+        If all of the get_buckets deferreds have fired (whether callback or
+        errback) and I still don't have enough buckets then I'll callback the
+        Deferred that I return.
+        """
+        self._wait_for_enough_buckets_d = defer.Deferred()
+
+        self._queries_sent = 0
+        self._responses_received = 0
+        self._queries_failed = 0
         sb = self._storage_broker
         servers = sb.get_servers_for_index(self._storage_index)
         if not servers:
@@ -878,17 +889,15 @@ class CiphertextDownloader(log.PrefixingLogMixin):
             self.log(format="sending DYHB to [%(peerid)s]",
                      peerid=idlib.shortnodeid_b2a(peerid),
                      level=log.NOISY, umid="rT03hg")
+            self._queries_sent += 1
             d = ss.callRemote("get_buckets", self._storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
-            dl.append(d)
-        self._responses_received = 0
-        self._queries_sent = len(dl)
         if self._status:
             self._status.set_status("Locating Shares (%d/%d)" %
                                     (self._responses_received,
                                      self._queries_sent))
-        return defer.DeferredList(dl)
+        return self._wait_for_enough_buckets_d
 
     def _got_response(self, buckets, peerid):
         self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
@@ -906,6 +915,19 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         for sharenum, bucket in buckets.iteritems():
             b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
             self.add_share_bucket(sharenum, b)
+            # If we just got enough buckets for the first time, then fire the
+            # deferred. Then remove it from self so that we don't fire it
+            # again.
+            if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
+                self._wait_for_enough_buckets_d.callback(True)
+                self._wait_for_enough_buckets_d = None
+
+            # Else, if we ran out of outstanding requests then fire it and
+            # remove it from self.
+            assert (self._responses_received+self._queries_failed) <= self._queries_sent
+            if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._queries_sent:
+                self._wait_for_enough_buckets_d.callback(False)
+                self._wait_for_enough_buckets_d = None
 
             if self._results:
                 if peerid not in self._results.servermap:
@@ -914,7 +936,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
     def add_share_bucket(self, sharenum, bucket):
         # this is split out for the benefit of test_encode.py
-        self._share_buckets.append( (sharenum, bucket) )
+        self._share_buckets.setdefault(sharenum, []).append(bucket)
 
     def _got_error(self, f):
         level = log.WEIRD
@@ -922,6 +944,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
             level = log.UNUSUAL
         self.log("Error during get_buckets", failure=f, level=level,
                          umid="3uuBUQ")
+        # If we ran out of outstanding requests then errback it and remove it
+        # from self.
+        self._queries_failed += 1
+        assert (self._responses_received+self._queries_failed) <= self._queries_sent
+        if self._wait_for_enough_buckets_d and self._responses_received == self._queries_sent:
+            self._wait_for_enough_buckets_d.errback()
+            self._wait_for_enough_buckets_d = None
 
     def bucket_failed(self, vbucket):
         shnum = vbucket.sharenum
@@ -964,8 +993,9 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         uri_extension_fetch_started = time.time()
 
         vups = []
-        for sharenum, bucket in self._share_buckets:
-            vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
+        for sharenum, buckets in self._share_buckets.iteritems():
+            for bucket in buckets:
+                vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
         vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
         d = vto.start()
 
@@ -1001,9 +1031,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
     def _get_crypttext_hash_tree(self, res):
         vchtps = []
-        for sharenum, bucket in self._share_buckets:
-            vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
-            vchtps.append(vchtp)
+        for sharenum, buckets in self._share_buckets.iteritems():
+            for bucket in buckets:
+                vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
+                vchtps.append(vchtp)
 
         _get_crypttext_hash_tree_started = time.time()
         if self._status:
@@ -1054,9 +1085,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
 
     def _download_all_segments(self, res):
-        for sharenum, bucket in self._share_buckets:
-            vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
-            self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
+        for sharenum, buckets in self._share_buckets.iteritems():
+            for bucket in buckets:
+                vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
+                self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
 
         # after the above code, self._share_vbuckets contains enough
         # buckets to complete the download, and some extra ones to