from allmydata.Crypto.Cipher import AES
from cStringIO import StringIO
-import collections, random
class HaveAllPeersError(Exception):
self.buckets.update(b)
return (alreadygot, set(b.keys()))
-class Tahoe3PeerSelector:
-
- def __init__(self, upload_id):
- self.upload_id = upload_id
-
- def __repr__(self):
- return "<Tahoe3PeerSelector for upload %s>" % self.upload_id
-
- def get_shareholders(self, client,
- storage_index, share_size, block_size,
- num_segments, total_shares, shares_of_happiness,
- push_to_ourselves):
- """
- @return: a set of PeerTracker instances that have agreed to hold some
- shares for us
- """
-
- self.total_shares = total_shares
- self.shares_of_happiness = shares_of_happiness
-
- # we are responsible for locating the shareholders. self._encoder is
- # responsible for handling the data and sending out the shares.
- peers = client.get_permuted_peers(storage_index, push_to_ourselves)
-
- assert peers, "peer selection left us with zero peers for our data"
-
- # this needed_hashes computation should mirror
- # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
- # (instead of a HashTree) because we don't require actual hashing
- # just to count the levels.
- ht = hashtree.IncompleteHashTree(total_shares)
- num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
-
- client_renewal_secret = client.get_renewal_secret()
- client_cancel_secret = client.get_cancel_secret()
-
- file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
- storage_index)
- file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
- storage_index)
-
- trackers = [ PeerTracker(peerid, permutedid, conn,
- share_size, block_size,
- num_segments, num_share_hashes,
- storage_index,
- bucket_renewal_secret_hash(file_renewal_secret,
- peerid),
- bucket_cancel_secret_hash(file_cancel_secret,
- peerid),
- )
- for permutedid, peerid, conn in peers ]
- self.all_peers = trackers
- self.usable_peers = set(trackers) # this set shrinks over time
- self.used_peers = set() # while this set grows
- self.unallocated_sharenums = set(range(total_shares)) # this one shrinks
-
- return self._locate_more_shareholders()
-
- def _locate_more_shareholders(self):
- d = self._query_peers()
- d.addCallback(self._located_some_shareholders)
- return d
-
- def _located_some_shareholders(self, res):
- log.msg("_located_some_shareholders")
- log.msg(" still need homes for %d shares, still have %d usable peers"
- % (len(self.unallocated_sharenums), len(self.usable_peers)))
- if not self.unallocated_sharenums:
- # Finished allocating places for all shares.
- log.msg("%s._locate_all_shareholders() "
- "Finished allocating places for all shares." % self)
- log.msg("used_peers is %s" % (self.used_peers,))
- return self.used_peers
- if not self.usable_peers:
- # Ran out of peers who have space.
- log.msg("%s._locate_all_shareholders() "
- "Ran out of peers who have space." % self)
- margin = self.total_shares - self.shares_of_happiness
- if len(self.unallocated_sharenums) < margin:
- # But we allocated places for enough shares.
- log.msg("%s._locate_all_shareholders() "
- "But we allocated places for enough shares.")
- return self.used_peers
- raise encode.NotEnoughPeersError
- # we need to keep trying
- return self._locate_more_shareholders()
-
- def _create_ring_of_things(self):
- PEER = 1 # must sort later than SHARE, for consistency with download
- SHARE = 0
- # ring_of_things is a list of (position_in_ring, whatami, x) where
- # whatami is SHARE if x is a sharenum or else PEER if x is a
- # PeerTracker instance
- ring_of_things = []
- ring_of_things.extend([ (peer.permutedid, PEER, peer,)
- for peer in self.usable_peers ])
- shares = [ (i * 2**160 / self.total_shares, SHARE, i)
- for i in self.unallocated_sharenums]
- ring_of_things.extend(shares)
- ring_of_things.sort()
- ring_of_things = collections.deque(ring_of_things)
- return ring_of_things
-
- def _query_peers(self):
- """
- @return: a deferred that fires when all queries have resolved
- """
- PEER = 1
- SHARE = 0
- ring = self._create_ring_of_things()
-
- # Choose a random starting point, talk to that peer.
- ring.rotate(random.randrange(0, len(ring)))
-
- # Walk backwards to find a peer. We know that we'll eventually find
- # one because we earlier asserted that there was at least one.
- while ring[0][1] != PEER:
- ring.rotate(-1)
- peer = ring[0][2]
- assert isinstance(peer, PeerTracker), peer
- ring.rotate(-1)
-
- # loop invariant: at the top of the loop, we are always one step to
- # the left of a peer, which is stored in the peer variable.
- outstanding_queries = []
- sharenums_to_query = set()
- for i in range(len(ring)):
- if ring[0][1] == SHARE:
- sharenums_to_query.add(ring[0][2])
- else:
- if True or sharenums_to_query:
- d = peer.query(sharenums_to_query)
- d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
- outstanding_queries.append(d)
- d.addErrback(log.err)
- peer = ring[0][2]
- sharenums_to_query = set()
- ring.rotate(-1)
-
- return defer.DeferredList(outstanding_queries)
-
- def _got_response(self, (alreadygot, allocated), peer, shares_we_requested):
- """
- @type alreadygot: a set of sharenums
- @type allocated: a set of sharenums
- """
- # TODO: some future version of Foolscap might not convert inbound
- # sets into sets.Set on us, even when we're using 2.4
- alreadygot = set(alreadygot)
- allocated = set(allocated)
- #log.msg("%s._got_response(%s, %s, %s): "
- # "self.unallocated_sharenums: %s, unhandled: %s"
- # % (self, (alreadygot, allocated), peer, shares_we_requested,
- # self.unallocated_sharenums,
- # shares_we_requested - alreadygot - allocated))
- self.unallocated_sharenums -= alreadygot
- self.unallocated_sharenums -= allocated
-
- if allocated:
- self.used_peers.add(peer)
-
- if shares_we_requested - alreadygot - allocated:
- # Then he didn't accept some of the shares, so he's full.
-
- #log.msg("%s._got_response(%s, %s, %s): "
- # "self.unallocated_sharenums: %s, unhandled: %s HE'S FULL"
- # % (self,
- # (alreadygot, allocated), peer, shares_we_requested,
- # self.unallocated_sharenums,
- # shares_we_requested - alreadygot - allocated))
- self.usable_peers.remove(peer)
-
- def _got_error(self, f, peer):
- log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
- self.usable_peers.remove(peer)
-
class Tahoe2PeerSelector:
def __init__(self, upload_id):