]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
upload.py: implement Tahoe2 peer-selection algorithm
authorBrian Warner <warner@lothar.com>
Sun, 16 Sep 2007 08:24:07 +0000 (01:24 -0700)
committerBrian Warner <warner@lothar.com>
Sun, 16 Sep 2007 08:24:07 +0000 (01:24 -0700)
src/allmydata/upload.py

index 425520f0373c51f5c8d1102227725f498c56a4c8..67c59f3888cccdaa0bd6bf90096d41181584cd5a 100644 (file)
@@ -1,7 +1,7 @@
 
 import os
 from zope.interface import implements
-from twisted.python import log
+from twisted.python import log, failure
 from twisted.internet import defer
 from twisted.application import service
 from foolscap import Referenceable
@@ -11,6 +11,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
      bucket_cancel_secret_hash, plaintext_hasher, \
      storage_index_chk_hash, plaintext_segment_hasher, key_hasher
 from allmydata import encode, storage, hashtree, uri
+from allmydata.util import idlib, mathutil
 from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable
 from allmydata.Crypto.Cipher import AES
 
@@ -59,6 +60,11 @@ class PeerTracker:
         self.renew_secret = bucket_renewal_secret
         self.cancel_secret = bucket_cancel_secret
 
+    def __repr__(self):
+        return ("<PeerTracker for peer %s and SI %s>"
+                % (idlib.b2a(self.peerid)[:4],
+                   idlib.b2a(self.storage_index)[:6]))
+
     def query(self, sharenums):
         if not self._storageserver:
             d = self.connection.callRemote("get_service", "storageserver")
@@ -95,6 +101,12 @@ class PeerTracker:
 
 class Tahoe3PeerSelector:
 
+    def __init__(self, upload_id):
+        self.upload_id = upload_id
+
+    def __repr__(self):
+        return "<Tahoe3PeerSelector for upload %s>" % self.upload_id
+
     def get_shareholders(self, client,
                          storage_index, share_size, block_size,
                          num_segments, total_shares, shares_of_happiness,
@@ -138,6 +150,7 @@ class Tahoe3PeerSelector:
                                                            peerid),
                                  )
                      for permutedid, peerid, conn in peers ]
+        self.all_peers = trackers
         self.usable_peers = set(trackers) # this set shrinks over time
         self.used_peers = set() # while this set grows
         self.unallocated_sharenums = set(range(total_shares)) # this one shrinks
@@ -262,6 +275,197 @@ class Tahoe3PeerSelector:
         log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
         self.usable_peers.remove(peer)
 
+class Tahoe2PeerSelector:
+
+    def __init__(self, upload_id):
+        self.upload_id = upload_id
+        self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
+        self.error_count = 0
+        self.num_peers_contacted = 0
+        self.last_failure_msg = None
+
+    def __repr__(self):
+        return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
+
+    def get_shareholders(self, client,
+                         storage_index, share_size, block_size,
+                         num_segments, total_shares, shares_of_happiness,
+                         push_to_ourselves):
+        """
+        @return: a set of PeerTracker instances that have agreed to hold some
+                 shares for us
+        """
+
+        self.total_shares = total_shares
+        self.shares_of_happiness = shares_of_happiness
+
+        self.homeless_shares = range(total_shares)
+        # self.uncontacted_peers = list() # peers we haven't asked yet
+        self.contacted_peers = list() # peers worth asking again
+        self.use_peers = set() # PeerTrackers that have shares assigned to them
+        self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
+
+        peers = client.get_permuted_peers(storage_index, push_to_ourselves)
+        if not peers:
+            raise encode.NotEnoughPeersError("client gave us zero peers")
+
+        # figure out how much space to ask for
+
+        # this needed_hashes computation should mirror
+        # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
+        # (instead of a HashTree) because we don't require actual hashing
+        # just to count the levels.
+        ht = hashtree.IncompleteHashTree(total_shares)
+        num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
+
+        # decide upon the renewal/cancel secrets, to include them in the
+        # allocat_buckets query.
+        client_renewal_secret = client.get_renewal_secret()
+        client_cancel_secret = client.get_cancel_secret()
+
+        file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
+                                                       storage_index)
+        file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
+                                                     storage_index)
+
+        trackers = [ PeerTracker(peerid, permutedid, conn,
+                                 share_size, block_size,
+                                 num_segments, num_share_hashes,
+                                 storage_index,
+                                 bucket_renewal_secret_hash(file_renewal_secret,
+                                                            peerid),
+                                 bucket_cancel_secret_hash(file_cancel_secret,
+                                                           peerid),
+                                 )
+                     for permutedid, peerid, conn in peers ]
+        self.uncontacted_peers = trackers
+
+        d = defer.maybeDeferred(self._loop)
+        return d
+
+    def _loop(self):
+        if not self.homeless_shares:
+            # all done
+            msg = ("placed all %d shares, "
+                   "sent %d queries to %d peers, "
+                   "%d queries placed some shares, %d placed none, "
+                   "got %d errors" %
+                   (self.total_shares,
+                    self.query_count, self.num_peers_contacted,
+                    self.good_query_count, self.bad_query_count,
+                    self.error_count))
+            log.msg("peer selection successful for %s: %s" % (self, msg))
+            return self.use_peers
+
+        if self.uncontacted_peers:
+            peer = self.uncontacted_peers.pop(0)
+            # TODO: don't pre-convert all peerids to PeerTrackers
+            assert isinstance(peer, PeerTracker)
+
+            shares_to_ask = set([self.homeless_shares.pop(0)])
+            self.query_count += 1
+            self.num_peers_contacted += 1
+            d = peer.query(shares_to_ask)
+            d.addBoth(self._got_response, peer, shares_to_ask)
+            return d
+        elif self.contacted_peers:
+            # ask a peer that we've already asked.
+            num_shares = mathutil.div_ceil(len(self.homeless_shares),
+                                           len(self.contacted_peers))
+            shares_to_ask = set(self.homeless_shares[:num_shares])
+            self.homeless_shares[:num_shares] = []
+            peer = self.contacted_peers.pop(0)
+            self.query_count += 1
+            d = peer.query(shares_to_ask)
+            d.addBoth(self._got_response, peer, shares_to_ask)
+            return d
+        else:
+            # no more peers. If we haven't placed enough shares, we fail.
+            placed_shares = self.total_shares - len(self.homeless_shares)
+            if placed_shares < self.shares_of_happiness:
+                msg = ("placed %d shares out of %d total (%d homeless), "
+                       "sent %d queries to %d peers, "
+                       "%d queries placed some shares, %d placed none, "
+                       "got %d errors" %
+                       (self.total_shares - len(self.homeless_shares),
+                        self.total_shares, len(self.homeless_shares),
+                        self.query_count, self.num_peers_contacted,
+                        self.good_query_count, self.bad_query_count,
+                        self.error_count))
+                msg = "peer selection failed for %s: %s" % (self, msg)
+                if self.last_failure_msg:
+                    msg += " (%s)" % (self.last_failure_msg,)
+                log.msg(msg)
+                raise encode.NotEnoughPeersError(msg)
+            else:
+                # we placed enough to be happy, so we're done
+                return self.use_peers
+
+    def _got_response(self, res, peer, shares_to_ask):
+        if isinstance(res, failure.Failure):
+            # This is unusual, and probably indicates a bug or a network
+            # problem.
+            log.msg("%s got error during peer selection: %s" % (peer, res))
+            self.error_count += 1
+            self.homeless_shares = list(shares_to_ask) + self.homeless_shares
+            if self.uncontacted_peers or self.contacted_peers:
+                # there is still hope, so just loop
+                pass
+            else:
+                # No more peers, so this upload might fail (it depends upon
+                # whether we've hit shares_of_happiness or not). Log the last
+                # failure we got: if a coding error causes all peers to fail
+                # in the same way, this allows the common failure to be seen
+                # by the uploader and should help with debugging
+                msg = ("last failure (from %s) was: %s" % (peer, res))
+                self.last_failure_msg = msg
+        else:
+            (alreadygot, allocated) = res
+            progress = False
+            for s in alreadygot:
+                self.preexisting_shares[s] = peer
+                if s in self.homeless_shares:
+                    self.homeless_shares.remove(s)
+                    progress = True
+
+            # the PeerTracker will remember which shares were allocated on
+            # that peer. We just have to remember to use them.
+            if allocated:
+                self.use_peers.add(peer)
+                progress = True
+
+            not_yet_present = set(shares_to_ask) - set(alreadygot)
+            still_homeless = not_yet_present - set(allocated)
+
+            if progress:
+                # they accepted or already had at least one share, so
+                # progress has been made
+                self.good_query_count += 1
+            else:
+                self.bad_query_count += 1
+
+            if still_homeless:
+                # In networks with lots of space, this is very unusual and
+                # probably indicates an error. In networks with peers that
+                # are full, it is merely unusual. In networks that are very
+                # full, it is common, and many uploads will fail. In most
+                # cases, this is obviously not fatal, and we'll just use some
+                # other peers.
+
+                # some shares are still homeless, keep trying to find them a
+                # home. The ones that were rejected get first priority.
+                self.homeless_shares = (list(still_homeless)
+                                        + self.homeless_shares)
+                # Since they were unable to accept all of our requests, so it
+                # is safe to assume that asking them again won't help.
+            else:
+                # if they *were* able to accept everything, they might be
+                # willing to accept even more.
+                self.contacted_peers.append(peer)
+
+        # now loop
+        return self._loop()
+
 
 class EncryptAnUploadable:
     """This is a wrapper that takes an IUploadable and provides
@@ -415,9 +619,10 @@ class CHKUploader:
         return d
 
     def locate_all_shareholders(self, encoder):
-        peer_selector = self.peer_selector_class()
-
         storage_index = encoder.get_param("storage_index")
+        upload_id = idlib.b2a(storage_index)[:6]
+        peer_selector = self.peer_selector_class(upload_id)
+
         share_size = encoder.get_param("share_size")
         block_size = encoder.get_param("block_size")
         num_segments = encoder.get_param("num_segments")