X-Git-Url: https://git.rkrishnan.org/?a=blobdiff_plain;f=src%2Fallmydata%2Fstorage_client.py;h=09148932d9271d5ee0fa4c2b4c309a930e69ac90;hb=bc496b72dc66a5e44b3b99bbe711e5ff523e1848;hp=dd9780f23a8c4efd8eefccd1de47cd3d7f92d060;hpb=22bbc705b16769cdf50e24e068d566f08f5352ab;p=tahoe-lafs%2Ftahoe-lafs.git diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index dd9780f2..09148932 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -62,10 +62,13 @@ class StorageFarmBroker: I'm also responsible for subscribing to the IntroducerClient to find out about new servers as they are announced by the Introducer. """ - def __init__(self, tub, permute_peers, preferred_peers=()): + def __init__(self, tub, permute_peers, connected_threshold, connected_d, + preferred_peers=()): self.tub = tub assert permute_peers # False not implemented yet self.permute_peers = permute_peers + self.connected_threshold = connected_threshold + self.connected_d = connected_d self.preferred_peers = preferred_peers # self.servers maps serverid -> IServer, and keeps track of all the # storage servers that we've heard about. Each descriptor manages its @@ -76,7 +79,7 @@ class StorageFarmBroker: # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): - s = NativeStorageServer(serverid, ann.copy()) + s = NativeStorageServer(serverid, ann.copy(), self) s.rref = rref s._is_connected = True self.servers[serverid] = s @@ -93,7 +96,7 @@ class StorageFarmBroker: precondition(isinstance(key_s, str), key_s) precondition(key_s.startswith("v0-"), key_s) assert ann["service-name"] == "storage" - s = NativeStorageServer(key_s, ann) + s = NativeStorageServer(key_s, ann, self) serverid = s.get_serverid() old = self.servers.get(serverid) if old: @@ -119,6 +122,13 @@ class StorageFarmBroker: for dsc in self.servers.values(): dsc.try_to_connect() + def check_enough_connected(self): + if (self.connected_d is not None and + len(self.get_connected_servers()) >= self.connected_threshold): + d = self.connected_d + self.connected_d = None + d.callback(None) + def get_servers_for_psi(self, peer_selection_index): # return a list of server objects (IServers) assert self.permute_peers == True @@ -191,9 +201,10 @@ class NativeStorageServer: "application-version": "unknown: no get_version()", } - def __init__(self, key_s, ann): + def __init__(self, key_s, ann, broker): self.key_s = key_s self.announcement = ann + self.broker = broker assert "anonymous-storage-FURL" in ann, ann furl = str(ann["anonymous-storage-FURL"]) @@ -294,6 +305,7 @@ class NativeStorageServer: default = self.VERSION_DEFAULTS d = add_version_to_remote_reference(rref, default) d.addCallback(self._got_versioned_service, lp) + d.addCallback(lambda ign: self.broker.check_enough_connected()) d.addErrback(log.err, format="storageclient._got_connection", name=self.get_name(), umid="Sdq3pg")