Skip to content

Commit d9f98b1

Browse files
committed
Make schema agreement waiting code renew connection on each iteration
When schema agreement is started it could happen that control connection is getting disconnected/reconnected, when it happens schema agreement code used to use disconnected connection to run all the queries. As result, it could lead to schema agreement timeout, even if all nodes got schema updated long time ago. This commit updates connection on every iteration and makes it iterate when underlying connection is clused
1 parent 2354c82 commit d9f98b1

File tree

1 file changed

+9
-8
lines changed

1 file changed

+9
-8
lines changed

cassandra/cluster.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -4220,15 +4220,14 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42204220
if self._is_shutdown:
42214221
return
42224222

4223-
if not connection:
4224-
connection = self._connection
4223+
current_connection = connection or self._connection
42254224

42264225
if preloaded_results:
42274226
log.debug("[control connection] Attempting to use preloaded results for schema agreement")
42284227

42294228
peers_result = preloaded_results[0]
42304229
local_result = preloaded_results[1]
4231-
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
4230+
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint)
42324231
if schema_mismatches is None:
42334232
return True
42344233

@@ -4237,16 +4236,18 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42374236
elapsed = 0
42384237
cl = ConsistencyLevel.ONE
42394238
schema_mismatches = None
4240-
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)
4239+
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, current_connection)
42414240

42424241
while elapsed < total_timeout:
4242+
current_connection = connection or self._connection
4243+
42434244
peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout),
42444245
consistency_level=cl)
42454246
local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout),
42464247
consistency_level=cl)
42474248
try:
42484249
timeout = min(self._timeout, total_timeout - elapsed)
4249-
peers_result, local_result = connection.wait_for_responses(
4250+
peers_result, local_result = current_connection.wait_for_responses(
42504251
peers_query, local_query, timeout=timeout)
42514252
except OperationTimedOut as timeout:
42524253
log.debug("[control connection] Timed out waiting for "
@@ -4258,9 +4259,9 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42584259
log.debug("[control connection] Aborting wait for schema match due to shutdown")
42594260
return None
42604261
else:
4261-
raise
4262+
continue
42624263

4263-
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
4264+
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint)
42644265
if schema_mismatches is None:
42654266
return True
42664267

@@ -4269,7 +4270,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42694270
elapsed = self._time.time() - start
42704271

42714272
log.warning("Node %s is reporting a schema disagreement: %s",
4272-
connection.endpoint, schema_mismatches)
4273+
current_connection.endpoint, schema_mismatches)
42734274
return False
42744275

42754276
def _get_schema_mismatches(self, peers_result, local_result, local_address):

0 commit comments

Comments
 (0)