import os
from zope.interface import implements
-from twisted.python import log
+from twisted.python import log, failure
from twisted.internet import defer
from twisted.application import service
from foolscap import Referenceable
bucket_cancel_secret_hash, plaintext_hasher, \
storage_index_chk_hash, plaintext_segment_hasher, key_hasher
from allmydata import encode, storage, hashtree, uri
+from allmydata.util import idlib, mathutil
from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable
from allmydata.Crypto.Cipher import AES
self.renew_secret = bucket_renewal_secret
self.cancel_secret = bucket_cancel_secret
+ def __repr__(self):
+ return ("<PeerTracker for peer %s and SI %s>"
+ % (idlib.b2a(self.peerid)[:4],
+ idlib.b2a(self.storage_index)[:6]))
+
def query(self, sharenums):
if not self._storageserver:
d = self.connection.callRemote("get_service", "storageserver")
class Tahoe3PeerSelector:
+ def __init__(self, upload_id):
+ self.upload_id = upload_id
+
+ def __repr__(self):
+ return "<Tahoe3PeerSelector for upload %s>" % self.upload_id
+
def get_shareholders(self, client,
storage_index, share_size, block_size,
num_segments, total_shares, shares_of_happiness,
peerid),
)
for permutedid, peerid, conn in peers ]
+ self.all_peers = trackers
self.usable_peers = set(trackers) # this set shrinks over time
self.used_peers = set() # while this set grows
self.unallocated_sharenums = set(range(total_shares)) # this one shrinks
log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
self.usable_peers.remove(peer)
+class Tahoe2PeerSelector:
+
+ def __init__(self, upload_id):
+ self.upload_id = upload_id
+ self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
+ self.error_count = 0
+ self.num_peers_contacted = 0
+ self.last_failure_msg = None
+
+ def __repr__(self):
+ return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
+
+ def get_shareholders(self, client,
+ storage_index, share_size, block_size,
+ num_segments, total_shares, shares_of_happiness,
+ push_to_ourselves):
+ """
+ @return: a set of PeerTracker instances that have agreed to hold some
+ shares for us
+ """
+
+ self.total_shares = total_shares
+ self.shares_of_happiness = shares_of_happiness
+
+ self.homeless_shares = range(total_shares)
+ # self.uncontacted_peers = list() # peers we haven't asked yet
+ self.contacted_peers = list() # peers worth asking again
+ self.use_peers = set() # PeerTrackers that have shares assigned to them
+ self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
+
+ peers = client.get_permuted_peers(storage_index, push_to_ourselves)
+ if not peers:
+ raise encode.NotEnoughPeersError("client gave us zero peers")
+
+ # figure out how much space to ask for
+
+ # this needed_hashes computation should mirror
+ # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
+ # (instead of a HashTree) because we don't require actual hashing
+ # just to count the levels.
+ ht = hashtree.IncompleteHashTree(total_shares)
+ num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
+
+ # decide upon the renewal/cancel secrets, to include them in the
+ # allocat_buckets query.
+ client_renewal_secret = client.get_renewal_secret()
+ client_cancel_secret = client.get_cancel_secret()
+
+ file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
+ storage_index)
+ file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
+ storage_index)
+
+ trackers = [ PeerTracker(peerid, permutedid, conn,
+ share_size, block_size,
+ num_segments, num_share_hashes,
+ storage_index,
+ bucket_renewal_secret_hash(file_renewal_secret,
+ peerid),
+ bucket_cancel_secret_hash(file_cancel_secret,
+ peerid),
+ )
+ for permutedid, peerid, conn in peers ]
+ self.uncontacted_peers = trackers
+
+ d = defer.maybeDeferred(self._loop)
+ return d
+
+ def _loop(self):
+ if not self.homeless_shares:
+ # all done
+ msg = ("placed all %d shares, "
+ "sent %d queries to %d peers, "
+ "%d queries placed some shares, %d placed none, "
+ "got %d errors" %
+ (self.total_shares,
+ self.query_count, self.num_peers_contacted,
+ self.good_query_count, self.bad_query_count,
+ self.error_count))
+ log.msg("peer selection successful for %s: %s" % (self, msg))
+ return self.use_peers
+
+ if self.uncontacted_peers:
+ peer = self.uncontacted_peers.pop(0)
+ # TODO: don't pre-convert all peerids to PeerTrackers
+ assert isinstance(peer, PeerTracker)
+
+ shares_to_ask = set([self.homeless_shares.pop(0)])
+ self.query_count += 1
+ self.num_peers_contacted += 1
+ d = peer.query(shares_to_ask)
+ d.addBoth(self._got_response, peer, shares_to_ask)
+ return d
+ elif self.contacted_peers:
+ # ask a peer that we've already asked.
+ num_shares = mathutil.div_ceil(len(self.homeless_shares),
+ len(self.contacted_peers))
+ shares_to_ask = set(self.homeless_shares[:num_shares])
+ self.homeless_shares[:num_shares] = []
+ peer = self.contacted_peers.pop(0)
+ self.query_count += 1
+ d = peer.query(shares_to_ask)
+ d.addBoth(self._got_response, peer, shares_to_ask)
+ return d
+ else:
+ # no more peers. If we haven't placed enough shares, we fail.
+ placed_shares = self.total_shares - len(self.homeless_shares)
+ if placed_shares < self.shares_of_happiness:
+ msg = ("placed %d shares out of %d total (%d homeless), "
+ "sent %d queries to %d peers, "
+ "%d queries placed some shares, %d placed none, "
+ "got %d errors" %
+ (self.total_shares - len(self.homeless_shares),
+ self.total_shares, len(self.homeless_shares),
+ self.query_count, self.num_peers_contacted,
+ self.good_query_count, self.bad_query_count,
+ self.error_count))
+ msg = "peer selection failed for %s: %s" % (self, msg)
+ if self.last_failure_msg:
+ msg += " (%s)" % (self.last_failure_msg,)
+ log.msg(msg)
+ raise encode.NotEnoughPeersError(msg)
+ else:
+ # we placed enough to be happy, so we're done
+ return self.use_peers
+
+ def _got_response(self, res, peer, shares_to_ask):
+ if isinstance(res, failure.Failure):
+ # This is unusual, and probably indicates a bug or a network
+ # problem.
+ log.msg("%s got error during peer selection: %s" % (peer, res))
+ self.error_count += 1
+ self.homeless_shares = list(shares_to_ask) + self.homeless_shares
+ if self.uncontacted_peers or self.contacted_peers:
+ # there is still hope, so just loop
+ pass
+ else:
+ # No more peers, so this upload might fail (it depends upon
+ # whether we've hit shares_of_happiness or not). Log the last
+ # failure we got: if a coding error causes all peers to fail
+ # in the same way, this allows the common failure to be seen
+ # by the uploader and should help with debugging
+ msg = ("last failure (from %s) was: %s" % (peer, res))
+ self.last_failure_msg = msg
+ else:
+ (alreadygot, allocated) = res
+ progress = False
+ for s in alreadygot:
+ self.preexisting_shares[s] = peer
+ if s in self.homeless_shares:
+ self.homeless_shares.remove(s)
+ progress = True
+
+ # the PeerTracker will remember which shares were allocated on
+ # that peer. We just have to remember to use them.
+ if allocated:
+ self.use_peers.add(peer)
+ progress = True
+
+ not_yet_present = set(shares_to_ask) - set(alreadygot)
+ still_homeless = not_yet_present - set(allocated)
+
+ if progress:
+ # they accepted or already had at least one share, so
+ # progress has been made
+ self.good_query_count += 1
+ else:
+ self.bad_query_count += 1
+
+ if still_homeless:
+ # In networks with lots of space, this is very unusual and
+ # probably indicates an error. In networks with peers that
+ # are full, it is merely unusual. In networks that are very
+ # full, it is common, and many uploads will fail. In most
+ # cases, this is obviously not fatal, and we'll just use some
+ # other peers.
+
+ # some shares are still homeless, keep trying to find them a
+ # home. The ones that were rejected get first priority.
+ self.homeless_shares = (list(still_homeless)
+ + self.homeless_shares)
+ # Since they were unable to accept all of our requests, so it
+ # is safe to assume that asking them again won't help.
+ else:
+ # if they *were* able to accept everything, they might be
+ # willing to accept even more.
+ self.contacted_peers.append(peer)
+
+ # now loop
+ return self._loop()
+
class EncryptAnUploadable:
"""This is a wrapper that takes an IUploadable and provides
return d
def locate_all_shareholders(self, encoder):
- peer_selector = self.peer_selector_class()
-
storage_index = encoder.get_param("storage_index")
+ upload_id = idlib.b2a(storage_index)[:6]
+ peer_selector = self.peer_selector_class(upload_id)
+
share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments")