@@ -4220,15 +4220,14 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42204220 if self ._is_shutdown :
42214221 return
42224222
4223- if not connection :
4224- connection = self ._connection
4223+ current_connection = connection or self ._connection
42254224
42264225 if preloaded_results :
42274226 log .debug ("[control connection] Attempting to use preloaded results for schema agreement" )
42284227
42294228 peers_result = preloaded_results [0 ]
42304229 local_result = preloaded_results [1 ]
4231- schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , connection .endpoint )
4230+ schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , current_connection .endpoint )
42324231 if schema_mismatches is None :
42334232 return True
42344233
@@ -4237,16 +4236,27 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42374236 elapsed = 0
42384237 cl = ConsistencyLevel .ONE
42394238 schema_mismatches = None
4240- select_peers_query = self ._get_peers_query (self .PeersQueryType .PEERS_SCHEMA , connection )
4239+ select_peers_query = self ._get_peers_query (self .PeersQueryType .PEERS_SCHEMA , current_connection )
4240+ error_signaled = False
42414241
42424242 while elapsed < total_timeout :
4243+ if current_connection != connection or self ._connection :
4244+ current_connection = connection or self ._connection
4245+ error_signaled = False
4246+
4247+ if current_connection .is_defunct or current_connection .is_closed :
4248+ log .debug ("[control connection] connection is closed, wait and trying again" )
4249+ self ._time .sleep (0.2 )
4250+ elapsed = self ._time .time () - start
4251+ continue
4252+
42434253 peers_query = QueryMessage (query = maybe_add_timeout_to_query (select_peers_query , self ._metadata_request_timeout ),
42444254 consistency_level = cl )
42454255 local_query = QueryMessage (query = maybe_add_timeout_to_query (self ._SELECT_SCHEMA_LOCAL , self ._metadata_request_timeout ),
42464256 consistency_level = cl )
42474257 try :
42484258 timeout = min (self ._timeout , total_timeout - elapsed )
4249- peers_result , local_result = connection .wait_for_responses (
4259+ peers_result , local_result = current_connection .wait_for_responses (
42504260 peers_query , local_query , timeout = timeout )
42514261 except OperationTimedOut as timeout :
42524262 log .debug ("[control connection] Timed out waiting for "
@@ -4257,10 +4267,12 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42574267 if self ._is_shutdown :
42584268 log .debug ("[control connection] Aborting wait for schema match due to shutdown" )
42594269 return None
4260- else :
4261- raise
4270+ elif not error_signaled :
4271+ self ._signal_error ()
4272+ error_signaled = True
4273+ continue
42624274
4263- schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , connection .endpoint )
4275+ schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , current_connection .endpoint )
42644276 if schema_mismatches is None :
42654277 return True
42664278
@@ -4269,7 +4281,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42694281 elapsed = self ._time .time () - start
42704282
42714283 log .warning ("Node %s is reporting a schema disagreement: %s" ,
4272- connection .endpoint , schema_mismatches )
4284+ current_connection .endpoint , schema_mismatches )
42734285 return False
42744286
42754287 def _get_schema_mismatches (self , peers_result , local_result , local_address ):
0 commit comments