]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
upload: remove Tahoe3 peer-selection algorithm
authorBrian Warner <warner@lothar.com>
Sun, 16 Sep 2007 08:26:11 +0000 (01:26 -0700)
committerBrian Warner <warner@lothar.com>
Sun, 16 Sep 2007 08:26:11 +0000 (01:26 -0700)
src/allmydata/upload.py

index 15a4f3600f61c478bf1916351bd2ab994969c604..6e0a0ec0de7447d7260d10f5fa3f5ba889ae0f50 100644 (file)
@@ -16,7 +16,6 @@ from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable
 from allmydata.Crypto.Cipher import AES
 
 from cStringIO import StringIO
-import collections, random
 
 
 class HaveAllPeersError(Exception):
@@ -99,182 +98,6 @@ class PeerTracker:
         self.buckets.update(b)
         return (alreadygot, set(b.keys()))
 
-class Tahoe3PeerSelector:
-
-    def __init__(self, upload_id):
-        self.upload_id = upload_id
-
-    def __repr__(self):
-        return "<Tahoe3PeerSelector for upload %s>" % self.upload_id
-
-    def get_shareholders(self, client,
-                         storage_index, share_size, block_size,
-                         num_segments, total_shares, shares_of_happiness,
-                         push_to_ourselves):
-        """
-        @return: a set of PeerTracker instances that have agreed to hold some
-            shares for us
-        """
-
-        self.total_shares = total_shares
-        self.shares_of_happiness = shares_of_happiness
-
-        # we are responsible for locating the shareholders. self._encoder is
-        # responsible for handling the data and sending out the shares.
-        peers = client.get_permuted_peers(storage_index, push_to_ourselves)
-
-        assert peers, "peer selection left us with zero peers for our data"
-
-        # this needed_hashes computation should mirror
-        # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
-        # (instead of a HashTree) because we don't require actual hashing
-        # just to count the levels.
-        ht = hashtree.IncompleteHashTree(total_shares)
-        num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
-
-        client_renewal_secret = client.get_renewal_secret()
-        client_cancel_secret = client.get_cancel_secret()
-
-        file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
-                                                       storage_index)
-        file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
-                                                     storage_index)
-
-        trackers = [ PeerTracker(peerid, permutedid, conn,
-                                 share_size, block_size,
-                                 num_segments, num_share_hashes,
-                                 storage_index,
-                                 bucket_renewal_secret_hash(file_renewal_secret,
-                                                            peerid),
-                                 bucket_cancel_secret_hash(file_cancel_secret,
-                                                           peerid),
-                                 )
-                     for permutedid, peerid, conn in peers ]
-        self.all_peers = trackers
-        self.usable_peers = set(trackers) # this set shrinks over time
-        self.used_peers = set() # while this set grows
-        self.unallocated_sharenums = set(range(total_shares)) # this one shrinks
-
-        return self._locate_more_shareholders()
-
-    def _locate_more_shareholders(self):
-        d = self._query_peers()
-        d.addCallback(self._located_some_shareholders)
-        return d
-
-    def _located_some_shareholders(self, res):
-        log.msg("_located_some_shareholders")
-        log.msg(" still need homes for %d shares, still have %d usable peers"
-                % (len(self.unallocated_sharenums), len(self.usable_peers)))
-        if not self.unallocated_sharenums:
-            # Finished allocating places for all shares.
-            log.msg("%s._locate_all_shareholders() "
-                    "Finished allocating places for all shares." % self)
-            log.msg("used_peers is %s" % (self.used_peers,))
-            return self.used_peers
-        if not self.usable_peers:
-            # Ran out of peers who have space.
-            log.msg("%s._locate_all_shareholders() "
-                    "Ran out of peers who have space." % self)
-            margin = self.total_shares - self.shares_of_happiness
-            if len(self.unallocated_sharenums) < margin:
-                # But we allocated places for enough shares.
-                log.msg("%s._locate_all_shareholders() "
-                        "But we allocated places for enough shares.")
-                return self.used_peers
-            raise encode.NotEnoughPeersError
-        # we need to keep trying
-        return self._locate_more_shareholders()
-
-    def _create_ring_of_things(self):
-        PEER = 1 # must sort later than SHARE, for consistency with download
-        SHARE = 0
-        # ring_of_things is a list of (position_in_ring, whatami, x) where
-        # whatami is SHARE if x is a sharenum or else PEER if x is a
-        # PeerTracker instance
-        ring_of_things = []
-        ring_of_things.extend([ (peer.permutedid, PEER, peer,)
-                                for peer in self.usable_peers ])
-        shares = [ (i * 2**160 / self.total_shares, SHARE, i)
-                   for i in self.unallocated_sharenums]
-        ring_of_things.extend(shares)
-        ring_of_things.sort()
-        ring_of_things = collections.deque(ring_of_things)
-        return ring_of_things
-        
-    def _query_peers(self):
-        """
-        @return: a deferred that fires when all queries have resolved
-        """
-        PEER = 1
-        SHARE = 0
-        ring = self._create_ring_of_things()
-
-        # Choose a random starting point, talk to that peer.
-        ring.rotate(random.randrange(0, len(ring)))
-
-        # Walk backwards to find a peer.  We know that we'll eventually find
-        # one because we earlier asserted that there was at least one.
-        while ring[0][1] != PEER:
-            ring.rotate(-1)
-        peer = ring[0][2]
-        assert isinstance(peer, PeerTracker), peer
-        ring.rotate(-1)
-
-        # loop invariant: at the top of the loop, we are always one step to
-        # the left of a peer, which is stored in the peer variable.
-        outstanding_queries = []
-        sharenums_to_query = set()
-        for i in range(len(ring)):
-            if ring[0][1] == SHARE:
-                sharenums_to_query.add(ring[0][2])
-            else:
-                if True or sharenums_to_query:
-                    d = peer.query(sharenums_to_query)
-                    d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
-                    outstanding_queries.append(d)
-                    d.addErrback(log.err)
-                peer = ring[0][2]
-                sharenums_to_query = set()
-            ring.rotate(-1)
-        
-        return defer.DeferredList(outstanding_queries)
-
-    def _got_response(self, (alreadygot, allocated), peer, shares_we_requested):
-        """
-        @type alreadygot: a set of sharenums
-        @type allocated: a set of sharenums
-        """
-        # TODO: some future version of Foolscap might not convert inbound
-        # sets into sets.Set on us, even when we're using 2.4
-        alreadygot = set(alreadygot)
-        allocated = set(allocated)
-        #log.msg("%s._got_response(%s, %s, %s): "
-        #        "self.unallocated_sharenums: %s, unhandled: %s"
-        #        % (self, (alreadygot, allocated), peer, shares_we_requested,
-        #           self.unallocated_sharenums,
-        #           shares_we_requested - alreadygot - allocated))
-        self.unallocated_sharenums -= alreadygot
-        self.unallocated_sharenums -= allocated
-
-        if allocated:
-            self.used_peers.add(peer)
-
-        if shares_we_requested - alreadygot - allocated:
-            # Then he didn't accept some of the shares, so he's full.
-
-            #log.msg("%s._got_response(%s, %s, %s): "
-            #        "self.unallocated_sharenums: %s, unhandled: %s HE'S FULL"
-            #        % (self,
-            #           (alreadygot, allocated), peer, shares_we_requested,
-            #           self.unallocated_sharenums,
-            #           shares_we_requested - alreadygot - allocated))
-            self.usable_peers.remove(peer)
-
-    def _got_error(self, f, peer):
-        log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
-        self.usable_peers.remove(peer)
-
 class Tahoe2PeerSelector:
 
     def __init__(self, upload_id):