Skip to content

Commit 90c92ba

Browse files
committed
librdkafka: expose retry and reconnect backoff settings
- set default reconnect backoff max to 30s
1 parent 5ffd8e1 commit 90c92ba

File tree

4 files changed

+77
-1
lines changed

4 files changed

+77
-1
lines changed

misc/python/materialize/mzcompose/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,10 @@ def get_default_system_parameters(
575575
"storage_statistics_retention_duration",
576576
"enable_paused_cluster_readhold_downgrade",
577577
"enable_builtin_migration_schema_evolution",
578+
"kafka_retry_backoff",
579+
"kafka_retry_backoff_max",
580+
"kafka_reconnect_backoff",
581+
"kafka_reconnect_backoff_max",
578582
]
579583

580584

misc/python/materialize/parallel_workload/action.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,6 +1517,10 @@ def __init__(
15171517
"enable_paused_cluster_readhold_downgrade",
15181518
"enable_with_ordinality_legacy_fallback",
15191519
"enable_builtin_migration_schema_evolution",
1520+
"kafka_retry_backoff",
1521+
"kafka_retry_backoff_max",
1522+
"kafka_reconnect_backoff",
1523+
"kafka_reconnect_backoff_max",
15201524
]
15211525

15221526
def run(self, exe: Executor) -> bool:

src/storage-types/src/connections.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ use crate::connections::string_or_secret::StringOrSecret;
6060
use crate::controller::AlterError;
6161
use crate::dyncfgs::{
6262
ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
63-
KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM,
63+
KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
64+
KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
6465
};
6566
use crate::errors::{ContextCreationError, CsrConnectError};
6667

@@ -852,6 +853,35 @@ impl KafkaConnection {
852853
}
853854
}
854855

856+
options.insert(
857+
"retry.backoff.ms".into(),
858+
KAFKA_RETRY_BACKOFF
859+
.get(storage_configuration.config_set())
860+
.as_millis()
861+
.into(),
862+
);
863+
options.insert(
864+
"retry.backoff.max.ms".into(),
865+
KAFKA_RETRY_BACKOFF_MAX
866+
.get(storage_configuration.config_set())
867+
.as_millis()
868+
.into(),
869+
);
870+
options.insert(
871+
"reconnect.backoff.ms".into(),
872+
KAFKA_RECONNECT_BACKOFF
873+
.get(storage_configuration.config_set())
874+
.as_millis()
875+
.into(),
876+
);
877+
options.insert(
878+
"reconnect.backoff.max.ms".into(),
879+
KAFKA_RECONNECT_BACKOFF_MAX
880+
.get(storage_configuration.config_set())
881+
.as_millis()
882+
.into(),
883+
);
884+
855885
let mut config = mz_kafka_util::client::create_new_client_config(
856886
storage_configuration
857887
.connection_context

src/storage-types/src/dyncfgs.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,40 @@ pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config<usize> = Config
129129
most this number of elements.",
130130
);
131131

132+
/// Sets retry.backoff.ms in librdkafka for sources and sinks.
133+
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
134+
pub const KAFKA_RETRY_BACKOFF: Config<Duration> = Config::new(
135+
"kafka_retry_backoff",
136+
Duration::from_millis(100),
137+
"Sets retry.backoff.ms in librdkafka for sources and sinks.",
138+
);
139+
140+
/// Sets retry.backoff.max.ms in librdkafka for sources and sinks.
141+
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
142+
pub const KAFKA_RETRY_BACKOFF_MAX: Config<Duration> = Config::new(
143+
"kafka_retry_backoff_max",
144+
Duration::from_secs(1),
145+
"Sets retry.backoff.max.ms in librdkafka for sources and sinks.",
146+
);
147+
148+
/// Sets reconnect.backoff.ms in librdkafka for sources and sinks.
149+
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
150+
pub const KAFKA_RECONNECT_BACKOFF: Config<Duration> = Config::new(
151+
"kafka_reconnect_backoff",
152+
Duration::from_millis(100),
153+
"Sets reconnect.backoff.ms in librdkafka for sources and sinks.",
154+
);
155+
156+
/// Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.
157+
/// We default to 30s instead of 10s to avoid constant reconnection attempts in the event of
158+
/// auth changes or unavailability.
159+
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
160+
pub const KAFKA_RECONNECT_BACKOFF_MAX: Config<Duration> = Config::new(
161+
"kafka_reconnect_backoff_max",
162+
Duration::from_secs(30),
163+
"Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.",
164+
);
165+
132166
// MySQL
133167

134168
/// Replication heartbeat interval requested from the MySQL server.
@@ -301,6 +335,10 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
301335
.add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM)
302336
.add(&KAFKA_METADATA_FETCH_INTERVAL)
303337
.add(&KAFKA_POLL_MAX_WAIT)
338+
.add(&KAFKA_RETRY_BACKOFF)
339+
.add(&KAFKA_RETRY_BACKOFF_MAX)
340+
.add(&KAFKA_RECONNECT_BACKOFF)
341+
.add(&KAFKA_RECONNECT_BACKOFF_MAX)
304342
.add(&MYSQL_OFFSET_KNOWN_INTERVAL)
305343
.add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL)
306344
.add(&ORE_OVERFLOWING_BEHAVIOR)

0 commit comments

Comments
 (0)