From: Brian Warner Date: Sun, 16 Sep 2007 08:26:11 +0000 (-0700) Subject: upload: remove Tahoe3 peer-selection algorithm X-Git-Tag: allmydata-tahoe-0.6.0~92 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/?a=commitdiff_plain;h=8a251d8670f3b8ce9e12f9e68d9f7231d11caf82;p=tahoe-lafs%2Ftahoe-lafs.git upload: remove Tahoe3 peer-selection algorithm --- diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 15a4f360..6e0a0ec0 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -16,7 +16,6 @@ from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable from allmydata.Crypto.Cipher import AES from cStringIO import StringIO -import collections, random class HaveAllPeersError(Exception): @@ -99,182 +98,6 @@ class PeerTracker: self.buckets.update(b) return (alreadygot, set(b.keys())) -class Tahoe3PeerSelector: - - def __init__(self, upload_id): - self.upload_id = upload_id - - def __repr__(self): - return "" % self.upload_id - - def get_shareholders(self, client, - storage_index, share_size, block_size, - num_segments, total_shares, shares_of_happiness, - push_to_ourselves): - """ - @return: a set of PeerTracker instances that have agreed to hold some - shares for us - """ - - self.total_shares = total_shares - self.shares_of_happiness = shares_of_happiness - - # we are responsible for locating the shareholders. self._encoder is - # responsible for handling the data and sending out the shares. - peers = client.get_permuted_peers(storage_index, push_to_ourselves) - - assert peers, "peer selection left us with zero peers for our data" - - # this needed_hashes computation should mirror - # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree - # (instead of a HashTree) because we don't require actual hashing - # just to count the levels. - ht = hashtree.IncompleteHashTree(total_shares) - num_share_hashes = len(ht.needed_hashes(0, include_leaf=True)) - - client_renewal_secret = client.get_renewal_secret() - client_cancel_secret = client.get_cancel_secret() - - file_renewal_secret = file_renewal_secret_hash(client_renewal_secret, - storage_index) - file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, - storage_index) - - trackers = [ PeerTracker(peerid, permutedid, conn, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - bucket_renewal_secret_hash(file_renewal_secret, - peerid), - bucket_cancel_secret_hash(file_cancel_secret, - peerid), - ) - for permutedid, peerid, conn in peers ] - self.all_peers = trackers - self.usable_peers = set(trackers) # this set shrinks over time - self.used_peers = set() # while this set grows - self.unallocated_sharenums = set(range(total_shares)) # this one shrinks - - return self._locate_more_shareholders() - - def _locate_more_shareholders(self): - d = self._query_peers() - d.addCallback(self._located_some_shareholders) - return d - - def _located_some_shareholders(self, res): - log.msg("_located_some_shareholders") - log.msg(" still need homes for %d shares, still have %d usable peers" - % (len(self.unallocated_sharenums), len(self.usable_peers))) - if not self.unallocated_sharenums: - # Finished allocating places for all shares. - log.msg("%s._locate_all_shareholders() " - "Finished allocating places for all shares." % self) - log.msg("used_peers is %s" % (self.used_peers,)) - return self.used_peers - if not self.usable_peers: - # Ran out of peers who have space. - log.msg("%s._locate_all_shareholders() " - "Ran out of peers who have space." % self) - margin = self.total_shares - self.shares_of_happiness - if len(self.unallocated_sharenums) < margin: - # But we allocated places for enough shares. - log.msg("%s._locate_all_shareholders() " - "But we allocated places for enough shares.") - return self.used_peers - raise encode.NotEnoughPeersError - # we need to keep trying - return self._locate_more_shareholders() - - def _create_ring_of_things(self): - PEER = 1 # must sort later than SHARE, for consistency with download - SHARE = 0 - # ring_of_things is a list of (position_in_ring, whatami, x) where - # whatami is SHARE if x is a sharenum or else PEER if x is a - # PeerTracker instance - ring_of_things = [] - ring_of_things.extend([ (peer.permutedid, PEER, peer,) - for peer in self.usable_peers ]) - shares = [ (i * 2**160 / self.total_shares, SHARE, i) - for i in self.unallocated_sharenums] - ring_of_things.extend(shares) - ring_of_things.sort() - ring_of_things = collections.deque(ring_of_things) - return ring_of_things - - def _query_peers(self): - """ - @return: a deferred that fires when all queries have resolved - """ - PEER = 1 - SHARE = 0 - ring = self._create_ring_of_things() - - # Choose a random starting point, talk to that peer. - ring.rotate(random.randrange(0, len(ring))) - - # Walk backwards to find a peer. We know that we'll eventually find - # one because we earlier asserted that there was at least one. - while ring[0][1] != PEER: - ring.rotate(-1) - peer = ring[0][2] - assert isinstance(peer, PeerTracker), peer - ring.rotate(-1) - - # loop invariant: at the top of the loop, we are always one step to - # the left of a peer, which is stored in the peer variable. - outstanding_queries = [] - sharenums_to_query = set() - for i in range(len(ring)): - if ring[0][1] == SHARE: - sharenums_to_query.add(ring[0][2]) - else: - if True or sharenums_to_query: - d = peer.query(sharenums_to_query) - d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,)) - outstanding_queries.append(d) - d.addErrback(log.err) - peer = ring[0][2] - sharenums_to_query = set() - ring.rotate(-1) - - return defer.DeferredList(outstanding_queries) - - def _got_response(self, (alreadygot, allocated), peer, shares_we_requested): - """ - @type alreadygot: a set of sharenums - @type allocated: a set of sharenums - """ - # TODO: some future version of Foolscap might not convert inbound - # sets into sets.Set on us, even when we're using 2.4 - alreadygot = set(alreadygot) - allocated = set(allocated) - #log.msg("%s._got_response(%s, %s, %s): " - # "self.unallocated_sharenums: %s, unhandled: %s" - # % (self, (alreadygot, allocated), peer, shares_we_requested, - # self.unallocated_sharenums, - # shares_we_requested - alreadygot - allocated)) - self.unallocated_sharenums -= alreadygot - self.unallocated_sharenums -= allocated - - if allocated: - self.used_peers.add(peer) - - if shares_we_requested - alreadygot - allocated: - # Then he didn't accept some of the shares, so he's full. - - #log.msg("%s._got_response(%s, %s, %s): " - # "self.unallocated_sharenums: %s, unhandled: %s HE'S FULL" - # % (self, - # (alreadygot, allocated), peer, shares_we_requested, - # self.unallocated_sharenums, - # shares_we_requested - alreadygot - allocated)) - self.usable_peers.remove(peer) - - def _got_error(self, f, peer): - log.msg("%s._got_error(%s, %s)" % (self, f, peer,)) - self.usable_peers.remove(peer) - class Tahoe2PeerSelector: def __init__(self, upload_id):