Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ def get_default_system_parameters(
"storage_statistics_retention_duration",
"enable_paused_cluster_readhold_downgrade",
"enable_builtin_migration_schema_evolution",
"kafka_retry_backoff",
"kafka_retry_backoff_max",
"kafka_reconnect_backoff",
"kafka_reconnect_backoff_max",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't hurt to have them configurable in tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think this prevented configuration in tests. My understanding is that these can still be set via additional_system_parameter_defaults or ALTER SYSTEM SET, if needed.

I put them in this list because they didn't seem to belong in the other camps (per the comment). We set reasonable defaults in the code. Testing variations of retry backoff settings seems unlikely to bear any fruit as far as meaningful bugs.

]


Expand Down
4 changes: 4 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,10 @@ def __init__(
"enable_paused_cluster_readhold_downgrade",
"enable_with_ordinality_legacy_fallback",
"enable_builtin_migration_schema_evolution",
"kafka_retry_backoff",
"kafka_retry_backoff_max",
"kafka_reconnect_backoff",
"kafka_reconnect_backoff_max",
]

def run(self, exe: Executor) -> bool:
Expand Down
32 changes: 31 additions & 1 deletion src/storage-types/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ use crate::connections::string_or_secret::StringOrSecret;
use crate::controller::AlterError;
use crate::dyncfgs::{
ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM,
KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
};
use crate::errors::{ContextCreationError, CsrConnectError};

Expand Down Expand Up @@ -852,6 +853,35 @@ impl KafkaConnection {
}
}

options.insert(
"retry.backoff.ms".into(),
KAFKA_RETRY_BACKOFF
.get(storage_configuration.config_set())
.as_millis()
.into(),
);
options.insert(
"retry.backoff.max.ms".into(),
KAFKA_RETRY_BACKOFF_MAX
.get(storage_configuration.config_set())
.as_millis()
.into(),
);
options.insert(
"reconnect.backoff.ms".into(),
KAFKA_RECONNECT_BACKOFF
.get(storage_configuration.config_set())
.as_millis()
.into(),
);
options.insert(
"reconnect.backoff.max.ms".into(),
KAFKA_RECONNECT_BACKOFF_MAX
.get(storage_configuration.config_set())
.as_millis()
.into(),
);

let mut config = mz_kafka_util::client::create_new_client_config(
storage_configuration
.connection_context
Expand Down
38 changes: 38 additions & 0 deletions src/storage-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,40 @@ pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config<usize> = Config
most this number of elements.",
);

/// Sets retry.backoff.ms in librdkafka for sources and sinks.
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
pub const KAFKA_RETRY_BACKOFF: Config<Duration> = Config::new(
"kafka_retry_backoff",
Duration::from_millis(100),
"Sets retry.backoff.ms in librdkafka for sources and sinks.",
);

/// Sets retry.backoff.max.ms in librdkafka for sources and sinks.
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
pub const KAFKA_RETRY_BACKOFF_MAX: Config<Duration> = Config::new(
"kafka_retry_backoff_max",
Duration::from_secs(1),
"Sets retry.backoff.max.ms in librdkafka for sources and sinks.",
);

/// Sets reconnect.backoff.ms in librdkafka for sources and sinks.
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
pub const KAFKA_RECONNECT_BACKOFF: Config<Duration> = Config::new(
"kafka_reconnect_backoff",
Duration::from_millis(100),
"Sets reconnect.backoff.ms in librdkafka for sources and sinks.",
);

/// Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.
/// We default to 30s instead of 10s to avoid constant reconnection attempts in the event of
/// auth changes or unavailability.
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
pub const KAFKA_RECONNECT_BACKOFF_MAX: Config<Duration> = Config::new(
"kafka_reconnect_backoff_max",
Duration::from_secs(30),
"Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.",
);

// MySQL

/// Replication heartbeat interval requested from the MySQL server.
Expand Down Expand Up @@ -301,6 +335,10 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM)
.add(&KAFKA_METADATA_FETCH_INTERVAL)
.add(&KAFKA_POLL_MAX_WAIT)
.add(&KAFKA_RETRY_BACKOFF)
.add(&KAFKA_RETRY_BACKOFF_MAX)
.add(&KAFKA_RECONNECT_BACKOFF)
.add(&KAFKA_RECONNECT_BACKOFF_MAX)
.add(&MYSQL_OFFSET_KNOWN_INTERVAL)
.add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL)
.add(&ORE_OVERFLOWING_BEHAVIOR)
Expand Down