From: Kevan Carstensen Date: Mon, 16 Nov 2009 20:28:05 +0000 (-0700) Subject: Alter Tahoe2PeerSelector to make sure that it recognizes existing shares on readonly... X-Git-Tag: trac-4400~98 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/flags?a=commitdiff_plain;h=a816de3f2396d0819a01fef9355c5929d6891b9c;p=tahoe-lafs%2Ftahoe-lafs.git Alter Tahoe2PeerSelector to make sure that it recognizes existing shares on readonly servers, fixing an issue in #778 --- diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 00efceaa..70b7d6d6 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -114,6 +114,15 @@ class PeerTracker: d.addCallback(self._got_reply) return d + def query_allocated(self): + d = self._storageserver.callRemote("get_buckets", + self.storage_index) + d.addCallback(self._got_allocate_reply) + return d + + def _got_allocate_reply(self, buckets): + return (self.peerid, buckets) + def _got_reply(self, (alreadygot, buckets)): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) b = {} @@ -183,6 +192,12 @@ class Tahoe2PeerSelector: self._started_second_pass = False self.use_peers = set() # PeerTrackers that have shares assigned to them self.preexisting_shares = {} # sharenum -> peerid holding the share + # We don't try to allocate shares to these servers, since they've + # said that they're incapable of storing shares of the size that + # we'd want to store. We keep them around because they may have + # existing shares for this storage index, which we want to know + # about for accurate servers_of_happiness accounting + self.readonly_peers = [] peers = storage_broker.get_servers_for_index(storage_index) if not peers: @@ -209,10 +224,10 @@ class Tahoe2PeerSelector: (peerid, conn) = peer v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] - peers = [peer for peer in peers - if _get_maxsize(peer) >= allocated_size] - if not peers: - raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size) + new_peers = [peer for peer in peers + if _get_maxsize(peer) >= allocated_size] + old_peers = list(set(peers).difference(set(new_peers))) + peers = new_peers # decide upon the renewal/cancel secrets, to include them in the # allocate_buckets query. @@ -223,22 +238,38 @@ class Tahoe2PeerSelector: storage_index) file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, storage_index) - - trackers = [ PeerTracker(peerid, conn, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - bucket_renewal_secret_hash(file_renewal_secret, - peerid), - bucket_cancel_secret_hash(file_cancel_secret, + def _make_trackers(peers): + return [ PeerTracker(peerid, conn, + share_size, block_size, + num_segments, num_share_hashes, + storage_index, + bucket_renewal_secret_hash(file_renewal_secret, peerid), - ) - for (peerid, conn) in peers ] - self.uncontacted_peers = trackers - - d = defer.maybeDeferred(self._loop) + bucket_cancel_secret_hash(file_cancel_secret, + peerid)) + for (peerid, conn) in peers] + self.uncontacted_peers = _make_trackers(peers) + self.readonly_peers = _make_trackers(old_peers) + # Talk to the readonly servers to get an idea of what servers + # have what shares (if any) for this storage index + d = defer.maybeDeferred(self._existing_shares) + d.addCallback(lambda ign: self._loop()) return d + def _existing_shares(self): + if self.readonly_peers: + peer = self.readonly_peers.pop() + assert isinstance(peer, PeerTracker) + d = peer.query_allocated() + d.addCallback(self._handle_allocate_response) + return d + + def _handle_allocate_response(self, (peer, buckets)): + for bucket in buckets: + self.preexisting_shares[bucket] = peer + if self.homeless_shares: + self.homeless_shares.remove(bucket) + return self._existing_shares() def _loop(self): if not self.homeless_shares: