self._opened = False
self.active_buckets = {} # k: shnum, v: bucket
- self._share_buckets = [] # list of (sharenum, bucket) tuples
+ self._share_buckets = {} # k: sharenum, v: list of buckets
self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
return d
def _get_all_shareholders(self):
- dl = []
+ """ Once the number of buckets that I know about is >= K then I
+ callback the Deferred that I return.
+
+ If all of the get_buckets deferreds have fired (whether callback or
+ errback) and I still don't have enough buckets then I'll callback the
+ Deferred that I return.
+ """
+ self._wait_for_enough_buckets_d = defer.Deferred()
+
+ self._queries_sent = 0
+ self._responses_received = 0
+ self._queries_failed = 0
sb = self._storage_broker
servers = sb.get_servers_for_index(self._storage_index)
if not servers:
self.log(format="sending DYHB to [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, umid="rT03hg")
+ self._queries_sent += 1
d = ss.callRemote("get_buckets", self._storage_index)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,))
- dl.append(d)
- self._responses_received = 0
- self._queries_sent = len(dl)
if self._status:
self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received,
self._queries_sent))
- return defer.DeferredList(dl)
+ return self._wait_for_enough_buckets_d
def _got_response(self, buckets, peerid):
self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
for sharenum, bucket in buckets.iteritems():
b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
self.add_share_bucket(sharenum, b)
+ # If we just got enough buckets for the first time, then fire the
+ # deferred. Then remove it from self so that we don't fire it
+ # again.
+ if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
+ self._wait_for_enough_buckets_d.callback(True)
+ self._wait_for_enough_buckets_d = None
+
+ # Else, if we ran out of outstanding requests then fire it and
+ # remove it from self.
+ assert (self._responses_received+self._queries_failed) <= self._queries_sent
+ if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._queries_sent:
+ self._wait_for_enough_buckets_d.callback(False)
+ self._wait_for_enough_buckets_d = None
if self._results:
if peerid not in self._results.servermap:
def add_share_bucket(self, sharenum, bucket):
# this is split out for the benefit of test_encode.py
- self._share_buckets.append( (sharenum, bucket) )
+ self._share_buckets.setdefault(sharenum, []).append(bucket)
def _got_error(self, f):
level = log.WEIRD
level = log.UNUSUAL
self.log("Error during get_buckets", failure=f, level=level,
umid="3uuBUQ")
+ # If we ran out of outstanding requests then errback it and remove it
+ # from self.
+ self._queries_failed += 1
+ assert (self._responses_received+self._queries_failed) <= self._queries_sent
+ if self._wait_for_enough_buckets_d and self._responses_received == self._queries_sent:
+ self._wait_for_enough_buckets_d.errback()
+ self._wait_for_enough_buckets_d = None
def bucket_failed(self, vbucket):
shnum = vbucket.sharenum
uri_extension_fetch_started = time.time()
vups = []
- for sharenum, bucket in self._share_buckets:
- vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
+ for sharenum, buckets in self._share_buckets.iteritems():
+ for bucket in buckets:
+ vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
d = vto.start()
def _get_crypttext_hash_tree(self, res):
vchtps = []
- for sharenum, bucket in self._share_buckets:
- vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
- vchtps.append(vchtp)
+ for sharenum, buckets in self._share_buckets.iteritems():
+ for bucket in buckets:
+ vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
+ vchtps.append(vchtp)
_get_crypttext_hash_tree_started = time.time()
if self._status:
def _download_all_segments(self, res):
- for sharenum, bucket in self._share_buckets:
- vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
- self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
+ for sharenum, buckets in self._share_buckets.iteritems():
+ for bucket in buckets:
+ vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
+ self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
# after the above code, self._share_vbuckets contains enough
# buckets to complete the download, and some extra ones to