From ccdc2622d49399511edb039570f7340ff2b62a55 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 16 Jan 2007 19:35:12 -0700 Subject: [PATCH] upload: rearrange peer-selection code to be more readable, and fix a silly bug --- src/allmydata/upload.py | 130 +++++++++++++++++++++++----------------- 1 file changed, 74 insertions(+), 56 deletions(-) diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 5040e44a..a7548018 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -6,6 +6,7 @@ from twisted.application import service from foolscap import Referenceable from allmydata.util import idlib, bencode +from allmydata.util.idlib import peerid_to_short_string as shortid from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata import codec @@ -85,11 +86,8 @@ class FileUploader: for p in self.permuted: assert isinstance(p, str) # we will shrink self.permuted as we give up on peers - self.peer_index = 0 - self.goodness_points = 0 - self.landlords = [] # list of (peerid, bucket_num, remotebucket) - d = defer.maybeDeferred(self._check_next_peer) + d = defer.maybeDeferred(self._find_peers) d.addCallback(self._got_enough_peers) d.addCallback(self._compute_uri) return d @@ -97,6 +95,27 @@ class FileUploader: def _compute_uri(self, params): return "URI:%s" % bencode.bencode((self._verifierid, params)) + def _build_not_enough_peers_error(self): + yes = ",".join([shortid(p) for p in self.peers_who_said_yes]) + no = ",".join([shortid(p) for p in self.peers_who_said_no]) + err = ",".join([shortid(p) for p in self.peers_who_had_errors]) + msg = ("%s goodness, want %s, have %d " + "landlords, %d total peers, " + "peers:yes=%s;no=%s;err=%s" % + (self.goodness_points, self.target_goodness, + len(self.landlords), self._total_peers, + yes, no, err)) + return msg + + def _find_peers(self): + # this returns a Deferred which fires (with a meaningless value) when + # enough peers are found, or errbacks with a NotEnoughPeersError if + # not. + self.peer_index = 0 + self.goodness_points = 0 + self.landlords = [] # list of (peerid, bucket_num, remotebucket) + return self._check_next_peer() + def _check_next_peer(self): if self.debug: log.msg("FileUploader._check_next_peer: %d permuted, %d goodness" @@ -104,76 +123,75 @@ class FileUploader: (len(self.permuted), self.goodness_points, self.target_goodness, len(self.landlords), self._total_peers)) - if len(self.permuted) == 0: - # there are no more to check - yes = ",".join([idlib.peerid_to_short_string(p) - for p in self.peers_who_said_yes]) - no = ",".join([idlib.peerid_to_short_string(p) - for p in self.peers_who_said_no]) - err = ",".join([idlib.peerid_to_short_string(p) - for p in self.peers_who_had_errors]) - msg = ("%s goodness, want %s, have %d " - "landlords, %d total peers, " - "peers:yes=%s;no=%s;err=%s" % - (self.goodness_points, self.target_goodness, - len(self.landlords), self._total_peers, - yes, no, err)) + if (self.goodness_points >= self.target_goodness and + len(self.landlords) >= self.min_shares): + if self.debug: print " we're done!" + return "done" + if not self.permuted: + # we've run out of peers to check without finding enough, which + # means we won't be able to upload this file. Bummer. + msg = self._build_not_enough_peers_error() log.msg("NotEnoughPeersError: %s" % msg) raise NotEnoughPeersError(msg) + + # otherwise we use self.peer_index to rotate through all the usable + # peers. It gets inremented elsewhere, but wrapped here. if self.peer_index >= len(self.permuted): self.peer_index = 0 peerid = self.permuted[self.peer_index] + d = self._check_peer(peerid) + d.addCallback(lambda res: self._check_next_peer()) + return d + + def _check_peer(self, peerid): + # contact a single peer, and ask them to hold a share. If they say + # yes, we update self.landlords and self.goodness_points, and + # increment self.peer_index. If they say no, or are uncontactable, we + # remove them from self.permuted. This returns a Deferred which never + # errbacks. + + bucket_num = len(self.landlords) d = self._peer.get_remote_service(peerid, "storageserver") def _got_peer(service): - bucket_num = len(self.landlords) - if self.debug: print "asking %s" % idlib.b2a(peerid) + if self.debug: print "asking %s" % shortid(peerid) d2 = service.callRemote("allocate_bucket", verifierid=self._verifierid, bucket_num=bucket_num, size=self._share_size, leaser=self._peer.nodeid, canary=Referenceable()) - def _allocate_response(bucket): - if self.debug: - print " peerid %s will grant us a lease" % idlib.b2a(peerid) - self.peers_who_said_yes.append(peerid) - self.landlords.append( (peerid, bucket_num, bucket) ) - self.goodness_points += 1 - if (self.goodness_points >= self.target_goodness and - len(self.landlords) >= self.min_shares): - if self.debug: print " we're done!" - raise HaveAllPeersError() - # otherwise we fall through to allocate more peers - d2.addCallback(_allocate_response) return d2 d.addCallback(_got_peer) - def _done_with_peer(res): - if self.debug: print "done with peer %s:" % idlib.b2a(peerid) - if isinstance(res, failure.Failure): - if res.check(HaveAllPeersError): - if self.debug: print " all done" - # we're done! - return - if res.check(TooFullError): - if self.debug: print " too full" - self.peers_who_said_no.append(peerid) - elif res.check(IndexError): - if self.debug: print " no connection" - self.peers_who_had_errors.append(peerid) - else: - if self.debug: print " other error:", res - self.peers_who_had_errors.append(peerid) - self.permuted.remove(peerid) # this peer was unusable + + def _allocate_response(bucket): + if self.debug: + print " peerid %s will grant us a lease" % shortid(peerid) + self.peers_who_said_yes.append(peerid) + self.landlords.append( (peerid, bucket_num, bucket) ) + self.goodness_points += 1 + self.peer_index += 1 + + d.addCallback(_allocate_response) + + def _err(f): + if self.debug: print "err from peer %s:" % idlib.b2a(peerid) + assert isinstance(f, failure.Failure) + if f.check(TooFullError): + if self.debug: print " too full" + self.peers_who_said_no.append(peerid) + elif f.check(IndexError): + if self.debug: print " no connection" + self.peers_who_had_errors.append(peerid) else: - if self.debug: print " they gave us a lease" - # we get here for either good peers (when we still need - # more), or after checking a bad peer (and thus still need - # more). So now we need to grab a new peer. - self.peer_index += 1 - return self._check_next_peer() - d.addBoth(_done_with_peer) + if self.debug: print " other error:", res + self.peers_who_had_errors.append(peerid) + log.msg("FileUploader._check_peer(%s): err" % shortid(peerid)) + log.msg(f) + self.permuted.remove(peerid) # this peer was unusable + return None + d.addErrback(_err) return d def _got_enough_peers(self, res): -- 2.45.2