From: Brian Warner Date: Sun, 16 Sep 2007 08:24:07 +0000 (-0700) Subject: upload.py: implement Tahoe2 peer-selection algorithm X-Git-Tag: allmydata-tahoe-0.6.0~94 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/%22doc.html?a=commitdiff_plain;h=979d12cd42f95e36951e14f1727e06840a4112cd;p=tahoe-lafs%2Ftahoe-lafs.git upload.py: implement Tahoe2 peer-selection algorithm --- diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 425520f0..67c59f38 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -1,7 +1,7 @@ import os from zope.interface import implements -from twisted.python import log +from twisted.python import log, failure from twisted.internet import defer from twisted.application import service from foolscap import Referenceable @@ -11,6 +11,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \ bucket_cancel_secret_hash, plaintext_hasher, \ storage_index_chk_hash, plaintext_segment_hasher, key_hasher from allmydata import encode, storage, hashtree, uri +from allmydata.util import idlib, mathutil from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable from allmydata.Crypto.Cipher import AES @@ -59,6 +60,11 @@ class PeerTracker: self.renew_secret = bucket_renewal_secret self.cancel_secret = bucket_cancel_secret + def __repr__(self): + return ("" + % (idlib.b2a(self.peerid)[:4], + idlib.b2a(self.storage_index)[:6])) + def query(self, sharenums): if not self._storageserver: d = self.connection.callRemote("get_service", "storageserver") @@ -95,6 +101,12 @@ class PeerTracker: class Tahoe3PeerSelector: + def __init__(self, upload_id): + self.upload_id = upload_id + + def __repr__(self): + return "" % self.upload_id + def get_shareholders(self, client, storage_index, share_size, block_size, num_segments, total_shares, shares_of_happiness, @@ -138,6 +150,7 @@ class Tahoe3PeerSelector: peerid), ) for permutedid, peerid, conn in peers ] + self.all_peers = trackers self.usable_peers = set(trackers) # this set shrinks over time self.used_peers = set() # while this set grows self.unallocated_sharenums = set(range(total_shares)) # this one shrinks @@ -262,6 +275,197 @@ class Tahoe3PeerSelector: log.msg("%s._got_error(%s, %s)" % (self, f, peer,)) self.usable_peers.remove(peer) +class Tahoe2PeerSelector: + + def __init__(self, upload_id): + self.upload_id = upload_id + self.query_count, self.good_query_count, self.bad_query_count = 0,0,0 + self.error_count = 0 + self.num_peers_contacted = 0 + self.last_failure_msg = None + + def __repr__(self): + return "" % self.upload_id + + def get_shareholders(self, client, + storage_index, share_size, block_size, + num_segments, total_shares, shares_of_happiness, + push_to_ourselves): + """ + @return: a set of PeerTracker instances that have agreed to hold some + shares for us + """ + + self.total_shares = total_shares + self.shares_of_happiness = shares_of_happiness + + self.homeless_shares = range(total_shares) + # self.uncontacted_peers = list() # peers we haven't asked yet + self.contacted_peers = list() # peers worth asking again + self.use_peers = set() # PeerTrackers that have shares assigned to them + self.preexisting_shares = {} # sharenum -> PeerTracker holding the share + + peers = client.get_permuted_peers(storage_index, push_to_ourselves) + if not peers: + raise encode.NotEnoughPeersError("client gave us zero peers") + + # figure out how much space to ask for + + # this needed_hashes computation should mirror + # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree + # (instead of a HashTree) because we don't require actual hashing + # just to count the levels. + ht = hashtree.IncompleteHashTree(total_shares) + num_share_hashes = len(ht.needed_hashes(0, include_leaf=True)) + + # decide upon the renewal/cancel secrets, to include them in the + # allocat_buckets query. + client_renewal_secret = client.get_renewal_secret() + client_cancel_secret = client.get_cancel_secret() + + file_renewal_secret = file_renewal_secret_hash(client_renewal_secret, + storage_index) + file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, + storage_index) + + trackers = [ PeerTracker(peerid, permutedid, conn, + share_size, block_size, + num_segments, num_share_hashes, + storage_index, + bucket_renewal_secret_hash(file_renewal_secret, + peerid), + bucket_cancel_secret_hash(file_cancel_secret, + peerid), + ) + for permutedid, peerid, conn in peers ] + self.uncontacted_peers = trackers + + d = defer.maybeDeferred(self._loop) + return d + + def _loop(self): + if not self.homeless_shares: + # all done + msg = ("placed all %d shares, " + "sent %d queries to %d peers, " + "%d queries placed some shares, %d placed none, " + "got %d errors" % + (self.total_shares, + self.query_count, self.num_peers_contacted, + self.good_query_count, self.bad_query_count, + self.error_count)) + log.msg("peer selection successful for %s: %s" % (self, msg)) + return self.use_peers + + if self.uncontacted_peers: + peer = self.uncontacted_peers.pop(0) + # TODO: don't pre-convert all peerids to PeerTrackers + assert isinstance(peer, PeerTracker) + + shares_to_ask = set([self.homeless_shares.pop(0)]) + self.query_count += 1 + self.num_peers_contacted += 1 + d = peer.query(shares_to_ask) + d.addBoth(self._got_response, peer, shares_to_ask) + return d + elif self.contacted_peers: + # ask a peer that we've already asked. + num_shares = mathutil.div_ceil(len(self.homeless_shares), + len(self.contacted_peers)) + shares_to_ask = set(self.homeless_shares[:num_shares]) + self.homeless_shares[:num_shares] = [] + peer = self.contacted_peers.pop(0) + self.query_count += 1 + d = peer.query(shares_to_ask) + d.addBoth(self._got_response, peer, shares_to_ask) + return d + else: + # no more peers. If we haven't placed enough shares, we fail. + placed_shares = self.total_shares - len(self.homeless_shares) + if placed_shares < self.shares_of_happiness: + msg = ("placed %d shares out of %d total (%d homeless), " + "sent %d queries to %d peers, " + "%d queries placed some shares, %d placed none, " + "got %d errors" % + (self.total_shares - len(self.homeless_shares), + self.total_shares, len(self.homeless_shares), + self.query_count, self.num_peers_contacted, + self.good_query_count, self.bad_query_count, + self.error_count)) + msg = "peer selection failed for %s: %s" % (self, msg) + if self.last_failure_msg: + msg += " (%s)" % (self.last_failure_msg,) + log.msg(msg) + raise encode.NotEnoughPeersError(msg) + else: + # we placed enough to be happy, so we're done + return self.use_peers + + def _got_response(self, res, peer, shares_to_ask): + if isinstance(res, failure.Failure): + # This is unusual, and probably indicates a bug or a network + # problem. + log.msg("%s got error during peer selection: %s" % (peer, res)) + self.error_count += 1 + self.homeless_shares = list(shares_to_ask) + self.homeless_shares + if self.uncontacted_peers or self.contacted_peers: + # there is still hope, so just loop + pass + else: + # No more peers, so this upload might fail (it depends upon + # whether we've hit shares_of_happiness or not). Log the last + # failure we got: if a coding error causes all peers to fail + # in the same way, this allows the common failure to be seen + # by the uploader and should help with debugging + msg = ("last failure (from %s) was: %s" % (peer, res)) + self.last_failure_msg = msg + else: + (alreadygot, allocated) = res + progress = False + for s in alreadygot: + self.preexisting_shares[s] = peer + if s in self.homeless_shares: + self.homeless_shares.remove(s) + progress = True + + # the PeerTracker will remember which shares were allocated on + # that peer. We just have to remember to use them. + if allocated: + self.use_peers.add(peer) + progress = True + + not_yet_present = set(shares_to_ask) - set(alreadygot) + still_homeless = not_yet_present - set(allocated) + + if progress: + # they accepted or already had at least one share, so + # progress has been made + self.good_query_count += 1 + else: + self.bad_query_count += 1 + + if still_homeless: + # In networks with lots of space, this is very unusual and + # probably indicates an error. In networks with peers that + # are full, it is merely unusual. In networks that are very + # full, it is common, and many uploads will fail. In most + # cases, this is obviously not fatal, and we'll just use some + # other peers. + + # some shares are still homeless, keep trying to find them a + # home. The ones that were rejected get first priority. + self.homeless_shares = (list(still_homeless) + + self.homeless_shares) + # Since they were unable to accept all of our requests, so it + # is safe to assume that asking them again won't help. + else: + # if they *were* able to accept everything, they might be + # willing to accept even more. + self.contacted_peers.append(peer) + + # now loop + return self._loop() + class EncryptAnUploadable: """This is a wrapper that takes an IUploadable and provides @@ -415,9 +619,10 @@ class CHKUploader: return d def locate_all_shareholders(self, encoder): - peer_selector = self.peer_selector_class() - storage_index = encoder.get_param("storage_index") + upload_id = idlib.b2a(storage_index)[:6] + peer_selector = self.peer_selector_class(upload_id) + share_size = encoder.get_param("share_size") block_size = encoder.get_param("block_size") num_segments = encoder.get_param("num_segments")