From ccdc2622d49399511edb039570f7340ff2b62a55 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@allmydata.com>
Date: Tue, 16 Jan 2007 19:35:12 -0700
Subject: [PATCH] upload: rearrange peer-selection code to be more readable,
 and fix a silly bug

---
 src/allmydata/upload.py | 130 +++++++++++++++++++++++-----------------
 1 file changed, 74 insertions(+), 56 deletions(-)

diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py
index 5040e44a..a7548018 100644
--- a/src/allmydata/upload.py
+++ b/src/allmydata/upload.py
@@ -6,6 +6,7 @@ from twisted.application import service
 from foolscap import Referenceable
 
 from allmydata.util import idlib, bencode
+from allmydata.util.idlib import peerid_to_short_string as shortid
 from allmydata.util.deferredutil import DeferredListShouldSucceed
 from allmydata import codec
 
@@ -85,11 +86,8 @@ class FileUploader:
         for p in self.permuted:
             assert isinstance(p, str)
         # we will shrink self.permuted as we give up on peers
-        self.peer_index = 0
-        self.goodness_points = 0
-        self.landlords = [] # list of (peerid, bucket_num, remotebucket)
 
-        d = defer.maybeDeferred(self._check_next_peer)
+        d = defer.maybeDeferred(self._find_peers)
         d.addCallback(self._got_enough_peers)
         d.addCallback(self._compute_uri)
         return d
@@ -97,6 +95,27 @@ class FileUploader:
     def _compute_uri(self, params):
         return "URI:%s" % bencode.bencode((self._verifierid, params))
 
+    def _build_not_enough_peers_error(self):
+        yes = ",".join([shortid(p) for p in self.peers_who_said_yes])
+        no = ",".join([shortid(p) for p in self.peers_who_said_no])
+        err = ",".join([shortid(p) for p in self.peers_who_had_errors])
+        msg = ("%s goodness, want %s, have %d "
+               "landlords, %d total peers, "
+               "peers:yes=%s;no=%s;err=%s" %
+               (self.goodness_points, self.target_goodness,
+                len(self.landlords), self._total_peers,
+                yes, no, err))
+        return msg
+
+    def _find_peers(self):
+        # this returns a Deferred which fires (with a meaningless value) when
+        # enough peers are found, or errbacks with a NotEnoughPeersError if
+        # not.
+        self.peer_index = 0
+        self.goodness_points = 0
+        self.landlords = [] # list of (peerid, bucket_num, remotebucket)
+        return self._check_next_peer()
+
     def _check_next_peer(self):
         if self.debug:
             log.msg("FileUploader._check_next_peer: %d permuted, %d goodness"
@@ -104,76 +123,75 @@ class FileUploader:
                     (len(self.permuted), self.goodness_points,
                      self.target_goodness, len(self.landlords),
                      self._total_peers))
-        if len(self.permuted) == 0:
-            # there are no more to check
-            yes = ",".join([idlib.peerid_to_short_string(p)
-                            for p in self.peers_who_said_yes])
-            no = ",".join([idlib.peerid_to_short_string(p)
-                           for p in self.peers_who_said_no])
-            err = ",".join([idlib.peerid_to_short_string(p)
-                            for p in self.peers_who_had_errors])
-            msg = ("%s goodness, want %s, have %d "
-                   "landlords, %d total peers, "
-                   "peers:yes=%s;no=%s;err=%s" %
-                   (self.goodness_points, self.target_goodness,
-                    len(self.landlords), self._total_peers,
-                    yes, no, err))
+        if (self.goodness_points >= self.target_goodness and
+            len(self.landlords) >= self.min_shares):
+            if self.debug: print " we're done!"
+            return "done"
+        if not self.permuted:
+            # we've run out of peers to check without finding enough, which
+            # means we won't be able to upload this file. Bummer.
+            msg = self._build_not_enough_peers_error()
             log.msg("NotEnoughPeersError: %s" % msg)
             raise NotEnoughPeersError(msg)
+
+        # otherwise we use self.peer_index to rotate through all the usable
+        # peers. It gets inremented elsewhere, but wrapped here.
         if self.peer_index >= len(self.permuted):
             self.peer_index = 0
 
         peerid = self.permuted[self.peer_index]
 
+        d = self._check_peer(peerid)
+        d.addCallback(lambda res: self._check_next_peer())
+        return d
+
+    def _check_peer(self, peerid):
+        # contact a single peer, and ask them to hold a share. If they say
+        # yes, we update self.landlords and self.goodness_points, and
+        # increment self.peer_index. If they say no, or are uncontactable, we
+        # remove them from self.permuted. This returns a Deferred which never
+        # errbacks.
+
+        bucket_num = len(self.landlords)
         d = self._peer.get_remote_service(peerid, "storageserver")
         def _got_peer(service):
-            bucket_num = len(self.landlords)
-            if self.debug: print "asking %s" % idlib.b2a(peerid)
+            if self.debug: print "asking %s" % shortid(peerid)
             d2 = service.callRemote("allocate_bucket",
                                     verifierid=self._verifierid,
                                     bucket_num=bucket_num,
                                     size=self._share_size,
                                     leaser=self._peer.nodeid,
                                     canary=Referenceable())
-            def _allocate_response(bucket):
-                if self.debug:
-                    print " peerid %s will grant us a lease" % idlib.b2a(peerid)
-                self.peers_who_said_yes.append(peerid)
-                self.landlords.append( (peerid, bucket_num, bucket) )
-                self.goodness_points += 1
-                if (self.goodness_points >= self.target_goodness and
-                    len(self.landlords) >= self.min_shares):
-                    if self.debug: print " we're done!"
-                    raise HaveAllPeersError()
-                # otherwise we fall through to allocate more peers
-            d2.addCallback(_allocate_response)
             return d2
         d.addCallback(_got_peer)
-        def _done_with_peer(res):
-            if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
-            if isinstance(res, failure.Failure):
-                if res.check(HaveAllPeersError):
-                    if self.debug: print " all done"
-                    # we're done!
-                    return
-                if res.check(TooFullError):
-                    if self.debug: print " too full"
-                    self.peers_who_said_no.append(peerid)
-                elif res.check(IndexError):
-                    if self.debug: print " no connection"
-                    self.peers_who_had_errors.append(peerid)
-                else:
-                    if self.debug: print " other error:", res
-                    self.peers_who_had_errors.append(peerid)
-                self.permuted.remove(peerid) # this peer was unusable
+
+        def _allocate_response(bucket):
+            if self.debug:
+                print " peerid %s will grant us a lease" % shortid(peerid)
+            self.peers_who_said_yes.append(peerid)
+            self.landlords.append( (peerid, bucket_num, bucket) )
+            self.goodness_points += 1
+            self.peer_index += 1
+
+        d.addCallback(_allocate_response)
+
+        def _err(f):
+            if self.debug: print "err from peer %s:" % idlib.b2a(peerid)
+            assert isinstance(f, failure.Failure)
+            if f.check(TooFullError):
+                if self.debug: print " too full"
+                self.peers_who_said_no.append(peerid)
+            elif f.check(IndexError):
+                if self.debug: print " no connection"
+                self.peers_who_had_errors.append(peerid)
             else:
-                if self.debug: print " they gave us a lease"
-                # we get here for either good peers (when we still need
-                # more), or after checking a bad peer (and thus still need
-                # more). So now we need to grab a new peer.
-                self.peer_index += 1
-            return self._check_next_peer()
-        d.addBoth(_done_with_peer)
+                if self.debug: print " other error:", res
+                self.peers_who_had_errors.append(peerid)
+                log.msg("FileUploader._check_peer(%s): err" % shortid(peerid))
+                log.msg(f)
+            self.permuted.remove(peerid) # this peer was unusable
+            return None
+        d.addErrback(_err)
         return d
 
     def _got_enough_peers(self, res):
-- 
2.45.2