d.addCallback(self._got_reply)
return d
+ def query_allocated(self):
+ d = self._storageserver.callRemote("get_buckets",
+ self.storage_index)
+ d.addCallback(self._got_allocate_reply)
+ return d
+
+ def _got_allocate_reply(self, buckets):
+ return (self.peerid, buckets)
+
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
self._started_second_pass = False
self.use_peers = set() # PeerTrackers that have shares assigned to them
self.preexisting_shares = {} # sharenum -> peerid holding the share
+ # We don't try to allocate shares to these servers, since they've
+ # said that they're incapable of storing shares of the size that
+ # we'd want to store. We keep them around because they may have
+ # existing shares for this storage index, which we want to know
+ # about for accurate servers_of_happiness accounting
+ self.readonly_peers = []
peers = storage_broker.get_servers_for_index(storage_index)
if not peers:
(peerid, conn) = peer
v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
- peers = [peer for peer in peers
- if _get_maxsize(peer) >= allocated_size]
- if not peers:
- raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
+ new_peers = [peer for peer in peers
+ if _get_maxsize(peer) >= allocated_size]
+ old_peers = list(set(peers).difference(set(new_peers)))
+ peers = new_peers
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
storage_index)
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
-
- trackers = [ PeerTracker(peerid, conn,
- share_size, block_size,
- num_segments, num_share_hashes,
- storage_index,
- bucket_renewal_secret_hash(file_renewal_secret,
- peerid),
- bucket_cancel_secret_hash(file_cancel_secret,
+ def _make_trackers(peers):
+ return [ PeerTracker(peerid, conn,
+ share_size, block_size,
+ num_segments, num_share_hashes,
+ storage_index,
+ bucket_renewal_secret_hash(file_renewal_secret,
peerid),
- )
- for (peerid, conn) in peers ]
- self.uncontacted_peers = trackers
-
- d = defer.maybeDeferred(self._loop)
+ bucket_cancel_secret_hash(file_cancel_secret,
+ peerid))
+ for (peerid, conn) in peers]
+ self.uncontacted_peers = _make_trackers(peers)
+ self.readonly_peers = _make_trackers(old_peers)
+ # Talk to the readonly servers to get an idea of what servers
+ # have what shares (if any) for this storage index
+ d = defer.maybeDeferred(self._existing_shares)
+ d.addCallback(lambda ign: self._loop())
return d
+ def _existing_shares(self):
+ if self.readonly_peers:
+ peer = self.readonly_peers.pop()
+ assert isinstance(peer, PeerTracker)
+ d = peer.query_allocated()
+ d.addCallback(self._handle_allocate_response)
+ return d
+
+ def _handle_allocate_response(self, (peer, buckets)):
+ for bucket in buckets:
+ self.preexisting_shares[bucket] = peer
+ if self.homeless_shares:
+ self.homeless_shares.remove(bucket)
+ return self._existing_shares()
def _loop(self):
if not self.homeless_shares: