diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index bc6526c8bb0c0..23c835975efcc 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -575,6 +575,10 @@ def get_default_system_parameters( "storage_statistics_retention_duration", "enable_paused_cluster_readhold_downgrade", "enable_builtin_migration_schema_evolution", + "kafka_retry_backoff", + "kafka_retry_backoff_max", + "kafka_reconnect_backoff", + "kafka_reconnect_backoff_max", ] diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 1f7f0406bf0fe..15df2ddabb8f8 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1517,6 +1517,10 @@ def __init__( "enable_paused_cluster_readhold_downgrade", "enable_with_ordinality_legacy_fallback", "enable_builtin_migration_schema_evolution", + "kafka_retry_backoff", + "kafka_retry_backoff_max", + "kafka_reconnect_backoff", + "kafka_reconnect_backoff_max", ] def run(self, exe: Executor) -> bool: diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index fc82d2602c0fa..241968b49c606 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -60,7 +60,8 @@ use crate::connections::string_or_secret::StringOrSecret; use crate::controller::AlterError; use crate::dyncfgs::{ ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES, - KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, + KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF, + KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX, }; use crate::errors::{ContextCreationError, CsrConnectError}; @@ -852,6 +853,35 @@ impl KafkaConnection { } } + options.insert( + "retry.backoff.ms".into(), + KAFKA_RETRY_BACKOFF + .get(storage_configuration.config_set()) + .as_millis() + .into(), + ); + options.insert( + "retry.backoff.max.ms".into(), + KAFKA_RETRY_BACKOFF_MAX + .get(storage_configuration.config_set()) + .as_millis() + .into(), + ); + options.insert( + "reconnect.backoff.ms".into(), + KAFKA_RECONNECT_BACKOFF + .get(storage_configuration.config_set()) + .as_millis() + .into(), + ); + options.insert( + "reconnect.backoff.max.ms".into(), + KAFKA_RECONNECT_BACKOFF_MAX + .get(storage_configuration.config_set()) + .as_millis() + .into(), + ); + let mut config = mz_kafka_util::client::create_new_client_config( storage_configuration .connection_context diff --git a/src/storage-types/src/dyncfgs.rs b/src/storage-types/src/dyncfgs.rs index 058aeeea15ebd..c78c9aabe3086 100644 --- a/src/storage-types/src/dyncfgs.rs +++ b/src/storage-types/src/dyncfgs.rs @@ -129,6 +129,40 @@ pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config = Config most this number of elements.", ); +/// Sets retry.backoff.ms in librdkafka for sources and sinks. +/// See +pub const KAFKA_RETRY_BACKOFF: Config = Config::new( + "kafka_retry_backoff", + Duration::from_millis(100), + "Sets retry.backoff.ms in librdkafka for sources and sinks.", +); + +/// Sets retry.backoff.max.ms in librdkafka for sources and sinks. +/// See +pub const KAFKA_RETRY_BACKOFF_MAX: Config = Config::new( + "kafka_retry_backoff_max", + Duration::from_secs(1), + "Sets retry.backoff.max.ms in librdkafka for sources and sinks.", +); + +/// Sets reconnect.backoff.ms in librdkafka for sources and sinks. +/// See +pub const KAFKA_RECONNECT_BACKOFF: Config = Config::new( + "kafka_reconnect_backoff", + Duration::from_millis(100), + "Sets reconnect.backoff.ms in librdkafka for sources and sinks.", +); + +/// Sets reconnect.backoff.max.ms in librdkafka for sources and sinks. +/// We default to 30s instead of 10s to avoid constant reconnection attempts in the event of +/// auth changes or unavailability. +/// See +pub const KAFKA_RECONNECT_BACKOFF_MAX: Config = Config::new( + "kafka_reconnect_backoff_max", + Duration::from_secs(30), + "Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.", +); + // MySQL /// Replication heartbeat interval requested from the MySQL server. @@ -301,6 +335,10 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM) .add(&KAFKA_METADATA_FETCH_INTERVAL) .add(&KAFKA_POLL_MAX_WAIT) + .add(&KAFKA_RETRY_BACKOFF) + .add(&KAFKA_RETRY_BACKOFF_MAX) + .add(&KAFKA_RECONNECT_BACKOFF) + .add(&KAFKA_RECONNECT_BACKOFF_MAX) .add(&MYSQL_OFFSET_KNOWN_INTERVAL) .add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL) .add(&ORE_OVERFLOWING_BEHAVIOR)