From: Zooko O'Whielacronx Date: Wed, 27 Jan 2010 23:34:17 +0000 (-0800) Subject: immutable: download from the first servers which provide at least K buckets instead... X-Git-Tag: allmydata-tahoe-1.6.0~21 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/css/somewhere?a=commitdiff_plain;h=2bd9dfa5bdacb381fa94a4a538a404df9f30adcc;p=tahoe-lafs%2Ftahoe-lafs.git immutable: download from the first servers which provide at least K buckets instead of waiting for all servers to reply This should put an end to the phenomenon I've been seeing that a single hung server can cause all downloads on a grid to hang. Also it should speed up all downloads by (a) not-waiting for responses to queries that it doesn't need, and (b) downloading shares from the servers which answered the initial query the fastest. Also, do not count how many buckets you've gotten when deciding whether the download has enough shares or not -- instead count how many buckets to *unique* shares that you've gotten. This appears to improve a slightly weird behavior in the current download code in which receiving >= K different buckets all to the same sharenumber would make it think it had enough to download the file when in fact it hadn't. This patch needs tests before it is actually ready for trunk. --- diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index e087b618..05e126b0 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -788,7 +788,7 @@ class CiphertextDownloader(log.PrefixingLogMixin): self._opened = False self.active_buckets = {} # k: shnum, v: bucket - self._share_buckets = [] # list of (sharenum, bucket) tuples + self._share_buckets = {} # k: sharenum, v: list of buckets self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, } @@ -869,7 +869,18 @@ class CiphertextDownloader(log.PrefixingLogMixin): return d def _get_all_shareholders(self): - dl = [] + """ Once the number of buckets that I know about is >= K then I + callback the Deferred that I return. + + If all of the get_buckets deferreds have fired (whether callback or + errback) and I still don't have enough buckets then I'll callback the + Deferred that I return. + """ + self._wait_for_enough_buckets_d = defer.Deferred() + + self._queries_sent = 0 + self._responses_received = 0 + self._queries_failed = 0 sb = self._storage_broker servers = sb.get_servers_for_index(self._storage_index) if not servers: @@ -878,17 +889,15 @@ class CiphertextDownloader(log.PrefixingLogMixin): self.log(format="sending DYHB to [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, umid="rT03hg") + self._queries_sent += 1 d = ss.callRemote("get_buckets", self._storage_index) d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peerid,)) - dl.append(d) - self._responses_received = 0 - self._queries_sent = len(dl) if self._status: self._status.set_status("Locating Shares (%d/%d)" % (self._responses_received, self._queries_sent)) - return defer.DeferredList(dl) + return self._wait_for_enough_buckets_d def _got_response(self, buckets, peerid): self.log(format="got results from [%(peerid)s]: shnums %(shnums)s", @@ -906,6 +915,19 @@ class CiphertextDownloader(log.PrefixingLogMixin): for sharenum, bucket in buckets.iteritems(): b = layout.ReadBucketProxy(bucket, peerid, self._storage_index) self.add_share_bucket(sharenum, b) + # If we just got enough buckets for the first time, then fire the + # deferred. Then remove it from self so that we don't fire it + # again. + if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares: + self._wait_for_enough_buckets_d.callback(True) + self._wait_for_enough_buckets_d = None + + # Else, if we ran out of outstanding requests then fire it and + # remove it from self. + assert (self._responses_received+self._queries_failed) <= self._queries_sent + if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._queries_sent: + self._wait_for_enough_buckets_d.callback(False) + self._wait_for_enough_buckets_d = None if self._results: if peerid not in self._results.servermap: @@ -914,7 +936,7 @@ class CiphertextDownloader(log.PrefixingLogMixin): def add_share_bucket(self, sharenum, bucket): # this is split out for the benefit of test_encode.py - self._share_buckets.append( (sharenum, bucket) ) + self._share_buckets.setdefault(sharenum, []).append(bucket) def _got_error(self, f): level = log.WEIRD @@ -922,6 +944,13 @@ class CiphertextDownloader(log.PrefixingLogMixin): level = log.UNUSUAL self.log("Error during get_buckets", failure=f, level=level, umid="3uuBUQ") + # If we ran out of outstanding requests then errback it and remove it + # from self. + self._queries_failed += 1 + assert (self._responses_received+self._queries_failed) <= self._queries_sent + if self._wait_for_enough_buckets_d and self._responses_received == self._queries_sent: + self._wait_for_enough_buckets_d.errback() + self._wait_for_enough_buckets_d = None def bucket_failed(self, vbucket): shnum = vbucket.sharenum @@ -964,8 +993,9 @@ class CiphertextDownloader(log.PrefixingLogMixin): uri_extension_fetch_started = time.time() vups = [] - for sharenum, bucket in self._share_buckets: - vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures)) + for sharenum, buckets in self._share_buckets.iteritems(): + for bucket in buckets: + vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures)) vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid) d = vto.start() @@ -1001,9 +1031,10 @@ class CiphertextDownloader(log.PrefixingLogMixin): def _get_crypttext_hash_tree(self, res): vchtps = [] - for sharenum, bucket in self._share_buckets: - vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures) - vchtps.append(vchtp) + for sharenum, buckets in self._share_buckets.iteritems(): + for bucket in buckets: + vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures) + vchtps.append(vchtp) _get_crypttext_hash_tree_started = time.time() if self._status: @@ -1054,9 +1085,10 @@ class CiphertextDownloader(log.PrefixingLogMixin): def _download_all_segments(self, res): - for sharenum, bucket in self._share_buckets: - vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) - self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) + for sharenum, buckets in self._share_buckets.iteritems(): + for bucket in buckets: + vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) + self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) # after the above code, self._share_vbuckets contains enough # buckets to complete the download, and some extra ones to