Alter Tahoe2PeerSelector to make sure that it recognizes existing shares on readonly...
authorKevan Carstensen <kevan@isnotajoke.com>
Mon, 16 Nov 2009 20:28:05 +0000 (13:28 -0700)
committerKevan Carstensen <kevan@isnotajoke.com>
Mon, 16 Nov 2009 20:28:05 +0000 (13:28 -0700)
src/allmydata/immutable/upload.py

index 00efceaacded4215529b9086c0d2d0b67d40407b..70b7d6d6f018e15d94fc2bd53514ecffe2f25d8a 100644 (file)
@@ -114,6 +114,15 @@ class PeerTracker:
         d.addCallback(self._got_reply)
         return d
 
+    def query_allocated(self):
+        d = self._storageserver.callRemote("get_buckets",
+                                           self.storage_index)
+        d.addCallback(self._got_allocate_reply)
+        return d
+
+    def _got_allocate_reply(self, buckets):
+        return (self.peerid, buckets)
+
     def _got_reply(self, (alreadygot, buckets)):
         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
         b = {}
@@ -183,6 +192,12 @@ class Tahoe2PeerSelector:
         self._started_second_pass = False
         self.use_peers = set() # PeerTrackers that have shares assigned to them
         self.preexisting_shares = {} # sharenum -> peerid holding the share
+        # We don't try to allocate shares to these servers, since they've 
+        # said that they're incapable of storing shares of the size that 
+        # we'd want to store. We keep them around because they may have
+        # existing shares for this storage index, which we want to know
+        # about for accurate servers_of_happiness accounting
+        self.readonly_peers = []
 
         peers = storage_broker.get_servers_for_index(storage_index)
         if not peers:
@@ -209,10 +224,10 @@ class Tahoe2PeerSelector:
             (peerid, conn) = peer
             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             return v1["maximum-immutable-share-size"]
-        peers = [peer for peer in peers
-                 if _get_maxsize(peer) >= allocated_size]
-        if not peers:
-            raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
+        new_peers = [peer for peer in peers
+                     if _get_maxsize(peer) >= allocated_size]
+        old_peers = list(set(peers).difference(set(new_peers)))
+        peers = new_peers
 
         # decide upon the renewal/cancel secrets, to include them in the
         # allocate_buckets query.
@@ -223,22 +238,38 @@ class Tahoe2PeerSelector:
                                                        storage_index)
         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
                                                      storage_index)
-
-        trackers = [ PeerTracker(peerid, conn,
-                                 share_size, block_size,
-                                 num_segments, num_share_hashes,
-                                 storage_index,
-                                 bucket_renewal_secret_hash(file_renewal_secret,
-                                                            peerid),
-                                 bucket_cancel_secret_hash(file_cancel_secret,
+        def _make_trackers(peers):
+           return [ PeerTracker(peerid, conn,
+                                share_size, block_size,
+                                num_segments, num_share_hashes,
+                                storage_index,
+                                bucket_renewal_secret_hash(file_renewal_secret,
                                                            peerid),
-                                 )
-                     for (peerid, conn) in peers ]
-        self.uncontacted_peers = trackers
-
-        d = defer.maybeDeferred(self._loop)
+                                bucket_cancel_secret_hash(file_cancel_secret,
+                                                          peerid))
+                    for (peerid, conn) in peers]
+        self.uncontacted_peers = _make_trackers(peers)
+        self.readonly_peers = _make_trackers(old_peers)
+        # Talk to the readonly servers to get an idea of what servers
+        # have what shares (if any) for this storage index
+        d = defer.maybeDeferred(self._existing_shares)
+        d.addCallback(lambda ign: self._loop())
         return d
 
+    def _existing_shares(self):
+        if self.readonly_peers:
+            peer = self.readonly_peers.pop()
+            assert isinstance(peer, PeerTracker)
+            d = peer.query_allocated()
+            d.addCallback(self._handle_allocate_response)
+            return d
+
+    def _handle_allocate_response(self, (peer, buckets)):
+        for bucket in buckets:
+            self.preexisting_shares[bucket] = peer
+            if self.homeless_shares:
+                self.homeless_shares.remove(bucket)
+        return self._existing_shares()
 
     def _loop(self):
         if not self.homeless_shares: