Skip to content

Commit

Permalink
cluster: iceberg_target_lag_ms topic property
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiman committed Feb 6, 2025
1 parent c91130b commit ddb8dc0
Show file tree
Hide file tree
Showing 24 changed files with 176 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/topic_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) {
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt, // want a real value?
};

auto random_initial_revision_id
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ metadata_cache::get_default_delete_retention_ms() const {
return config::shard_local_cfg().tombstone_retention_ms();
}

std::optional<std::chrono::milliseconds>
metadata_cache::get_default_iceberg_target_lag_ms() const {
// TODO(oren): i guess?
return config::shard_local_cfg().iceberg_catalog_commit_interval_ms();
}

topic_properties metadata_cache::get_default_properties() const {
topic_properties tp;
tp.compression = {get_default_compression()};
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ class metadata_cache {
get_default_record_value_subject_name_strategy() const;
std::optional<std::chrono::milliseconds>
get_default_delete_retention_ms() const;
std::optional<std::chrono::milliseconds>
get_default_iceberg_target_lag_ms() const;

topic_properties get_default_properties() const;
std::optional<partition_assignment>
Expand Down
11 changes: 9 additions & 2 deletions src/v/cluster/topic_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"iceberg_delete: {}, "
"iceberg_partition_spec: {}, "
"iceberg_invalid_record_action: {}",
"iceberg_target_lag_ms: {}",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand Down Expand Up @@ -83,7 +84,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.delete_retention_ms,
properties.iceberg_delete,
properties.iceberg_partition_spec,
properties.iceberg_invalid_record_action);
properties.iceberg_invalid_record_action,
properties.iceberg_target_lag_ms);

if (config::shard_local_cfg().development_enable_cloud_topics()) {
fmt::print(
Expand Down Expand Up @@ -130,7 +132,8 @@ bool topic_properties::has_overrides() const {
|| (iceberg_mode != storage::ntp_config::default_iceberg_mode)
|| leaders_preference.has_value() || delete_retention_ms.is_engaged()
|| iceberg_delete.has_value() || iceberg_partition_spec.has_value()
|| iceberg_invalid_record_action.has_value();
|| iceberg_invalid_record_action.has_value()
|| iceberg_target_lag_ms.has_value();

if (config::shard_local_cfg().development_enable_cloud_topics()) {
return overrides
Expand All @@ -150,6 +153,9 @@ bool topic_properties::requires_remote_erase() const {
&& !read_replica.value_or(false) && remote_delete;
}

// TODO(oren): need target lag here? not really clear usage
// TODO(oren): need a check somewhere s.t. if iceberg is enabled we populate
// target lag with the cluster config I guess
storage::ntp_config::default_overrides
topic_properties::get_ntp_cfg_overrides() const {
storage::ntp_config::default_overrides ret;
Expand Down Expand Up @@ -268,6 +274,7 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
};
}

Expand Down
11 changes: 8 additions & 3 deletions src/v/cluster/topic_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ struct topic_properties
std::optional<bool> iceberg_delete,
std::optional<ss::sstring> iceberg_partition_spec,
std::optional<model::iceberg_invalid_record_action>
iceberg_invalid_record_action)
iceberg_invalid_record_action,
std::optional<std::chrono::milliseconds> iceberg_target_lag_ms)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -123,7 +124,8 @@ struct topic_properties
, delete_retention_ms(delete_retention_ms)
, iceberg_delete(iceberg_delete)
, iceberg_partition_spec(std::move(iceberg_partition_spec))
, iceberg_invalid_record_action(iceberg_invalid_record_action) {}
, iceberg_invalid_record_action(iceberg_invalid_record_action)
, iceberg_target_lag_ms(iceberg_target_lag_ms) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -206,6 +208,8 @@ struct topic_properties
std::optional<model::iceberg_invalid_record_action>
iceberg_invalid_record_action;

std::optional<std::chrono::milliseconds> iceberg_target_lag_ms{};

bool is_compacted() const;
bool has_overrides() const;
bool requires_remote_erase() const;
Expand Down Expand Up @@ -254,7 +258,8 @@ struct topic_properties
delete_retention_ms,
iceberg_delete,
iceberg_partition_spec,
iceberg_invalid_record_action);
iceberg_invalid_record_action,
iceberg_target_lag_ms);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,9 @@ topic_properties topic_table::update_topic_properties(
incremental_update(
updated_properties.iceberg_invalid_record_action,
overrides.iceberg_invalid_record_action);
incremental_update(
updated_properties.iceberg_target_lag_ms,
overrides.iceberg_target_lag_ms);
return updated_properties;
}

Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
"flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, "
"remote_read: {}, remote_write: {}, iceberg_delete: {}, "
"iceberg_partition_spec: {}, "
"iceberg_invalid_record_action: {}",
"iceberg_invalid_record_action: {},"
"iceberg_target_lag_ms: {},",
i.compression,
i.cleanup_policy_bitflags,
i.compaction_strategy,
Expand Down Expand Up @@ -421,7 +422,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
i.remote_write,
i.iceberg_delete,
i.iceberg_partition_spec,
i.iceberg_invalid_record_action);
i.iceberg_invalid_record_action,
i.iceberg_target_lag_ms);
return o;
}

Expand Down
6 changes: 5 additions & 1 deletion src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,9 @@ struct incremental_topic_updates
property_update<std::optional<model::iceberg_invalid_record_action>>
iceberg_invalid_record_action;

property_update<std::optional<std::chrono::milliseconds>>
iceberg_target_lag_ms;

// To allow us to better control use of the deprecated shadow_indexing
// field, use getters and setters instead.
const auto& get_shadow_indexing() const { return shadow_indexing; }
Expand Down Expand Up @@ -685,7 +688,8 @@ struct incremental_topic_updates
delete_retention_ms,
iceberg_delete,
iceberg_partition_spec,
iceberg_invalid_record_action);
iceberg_invalid_record_action,
iceberg_target_lag_ms);
}

friend std::ostream&
Expand Down
5 changes: 5 additions & 0 deletions src/v/compat/cluster_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ struct compat_check<cluster::topic_properties> {
wr,
"iceberg_invalid_record_action",
obj.iceberg_invalid_record_action);
json_write(iceberg_target_lag_ms);
}

static cluster::topic_properties from_json(json::Value& rd) {
Expand Down Expand Up @@ -399,6 +400,7 @@ struct compat_check<cluster::topic_properties> {
json_read(flush_bytes);
json_read(remote_topic_namespace_override);
json_read(iceberg_invalid_record_action);
json_read(iceberg_target_lag_ms);
return obj;
}

Expand Down Expand Up @@ -431,6 +433,7 @@ struct compat_check<cluster::topic_properties> {
obj.flush_ms = std::nullopt;
obj.remote_topic_namespace_override = std::nullopt;
obj.iceberg_invalid_record_action = std::nullopt;
obj.iceberg_target_lag_ms = std::nullopt;

if (reply != obj) {
throw compat_error(fmt::format(
Expand Down Expand Up @@ -518,6 +521,8 @@ struct compat_check<cluster::topic_configuration> {

obj.properties.iceberg_invalid_record_action = std::nullopt;

obj.properties.iceberg_target_lag_ms = std::nullopt;

// ADL will always squash is_migrated to false
obj.is_migrated = false;

Expand Down
5 changes: 5 additions & 0 deletions src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,11 @@ struct instance_generator<cluster::topic_properties> {
return random_generators::random_choice(
{model::iceberg_invalid_record_action::drop,
model::iceberg_invalid_record_action::dlq_table});
}),
tests::random_optional([] {
// TODO(oren): should this be constrained?
// or is that part of the test?
return tests::random_duration_ms();
})};
}

Expand Down
2 changes: 2 additions & 0 deletions src/v/compat/cluster_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ inline void rjson_serialize(
write_member(w, "delete_retention_ms", tps.delete_retention_ms);
write_exceptional_member_type(
w, "iceberg_invalid_record_action", tps.iceberg_invalid_record_action);
write_member(w, "iceberg_target_lag_ms", tps.iceberg_target_lag_ms);
w.EndObject();
}

Expand Down Expand Up @@ -704,6 +705,7 @@ inline void read_value(const json::Value& rd, cluster::topic_properties& obj) {
read_member(rd, "delete_retention_ms", obj.delete_retention_ms);
read_member(
rd, "iceberg_invalid_record_action", obj.iceberg_invalid_record_action);
read_member(rd, "iceberg_target_lag_ms", obj.iceberg_target_lag_ms);
}

inline void rjson_serialize(
Expand Down
16 changes: 15 additions & 1 deletion src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ create_topic_properties_update(
std::apply(apply_op(op_t::none), update.custom_properties.serde_fields());

static_assert(
std::tuple_size_v<decltype(update.properties.serde_fields())> == 34,
std::tuple_size_v<decltype(update.properties.serde_fields())> == 35,
"If you added a property, please decide on it's default alter config "
"policy, and handle the update in the loop below");
static_assert(
Expand Down Expand Up @@ -385,6 +385,20 @@ create_topic_properties_update(
kafka::config_resource_operation::set);
continue;
}
if (cfg.name == topic_property_iceberg_target_lag_ms) {
parse_and_set_optional(
update.properties.iceberg_target_lag_ms,
cfg.value,
kafka::config_resource_operation::set,
iceberg_target_lag_ms_validator{},
[](const ss::sstring& v) {
auto parsed
= boost::lexical_cast<std::chrono::milliseconds::rep>(
v);
return std::chrono::milliseconds{parsed};
});
continue;
}
} catch (const validation_error& e) {
return make_error_alter_config_resource_response<
alter_configs_resource_response>(
Expand Down
17 changes: 17 additions & 0 deletions src/v/kafka/server/handlers/configs/config_response_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,23 @@ config_response_container_t make_topic_configs(
include_documentation,
"Action to take when an invalid record is encountered."),
&describe_as_string<model::iceberg_invalid_record_action>);

// TODO(oren): I really don't understand this function at all
add_topic_config_if_requested(
config_keys,
result,
topic_property_iceberg_target_lag_ms,
// TODO(oren): really not clear that this is the right default
config::shard_local_cfg().iceberg_catalog_commit_interval_ms(),
topic_property_iceberg_target_lag_ms,
topic_properties.iceberg_target_lag_ms,
include_synonyms,
maybe_make_documentation(
include_documentation,
"Best effort target for Iceberg table lag relative to source "
"topic, in "
"milliseconds"),
describe_as_string<std::chrono::milliseconds>);
}

return result;
Expand Down
19 changes: 19 additions & 0 deletions src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "pandaproxy/schema_registry/schema_id_validation.h"
#include "pandaproxy/schema_registry/subject_name_strategy.h"
#include "security/acl.h"
#include "serde/rw/chrono.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/sstring.hh>
Expand Down Expand Up @@ -519,6 +520,24 @@ struct iceberg_partition_spec_validator {
}
};

// TODO(oren): what's the actual range we want?
struct iceberg_target_lag_ms_validator {
std::optional<ss::sstring> operator()(
const ss::sstring& /*raw*/,
const std::optional<std::chrono::milliseconds>& maybe_value) {
if (maybe_value.has_value()) {
const auto& value = maybe_value.value();
if (value < 10ms || value > serde::max_serializable_ms) {
return fmt::format(
"target.lag.ms value invalid, expected to be in range [1, "
"{}]",
serde::max_serializable_ms);
}
}
return std::nullopt;
}
};

template<typename T, typename... ValidatorTypes>
requires requires(
model::topic_namespace_view tns,
Expand Down
6 changes: 4 additions & 2 deletions src/v/kafka/server/handlers/create_topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ bool is_supported(std::string_view name) {
topic_property_delete_retention_ms,
topic_property_iceberg_delete,
topic_property_iceberg_partition_spec,
topic_property_iceberg_invalid_record_action});
topic_property_iceberg_invalid_record_action,
topic_property_iceberg_target_lag_ms});

if (std::any_of(
supported_configs.begin(),
Expand Down Expand Up @@ -122,7 +123,8 @@ using validators = make_validator_types<
iceberg_config_validator,
iceberg_invalid_record_action_validator,
cloud_topic_config_validator,
delete_retention_ms_validator>;
delete_retention_ms_validator,
iceberg_target_lag_ms_validator>;

static void
append_topic_configs(request_context& ctx, create_topics_response& response) {
Expand Down
14 changes: 14 additions & 0 deletions src/v/kafka/server/handlers/incremental_alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,20 @@ create_topic_properties_update(
op);
continue;
}
if (cfg.name == topic_property_iceberg_target_lag_ms) {
parse_and_set_optional(
update.properties.iceberg_target_lag_ms,
cfg.value,
op,
iceberg_target_lag_ms_validator{},
[](const ss::sstring& v) {
auto parsed
= boost::lexical_cast<std::chrono::milliseconds::rep>(
v);
return std::chrono::milliseconds{parsed};
});
continue;
}

} catch (const validation_error& e) {
vlog(
Expand Down
13 changes: 13 additions & 0 deletions src/v/kafka/server/handlers/topics/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ get_delete_retention_ms(const config_map_t& config) {
return delete_retention_ms;
}

static std::optional<std::chrono::milliseconds>
get_iceberg_target_lag_ms(const config_map_t& config) {
if (auto v = get_config_value<std::chrono::milliseconds::rep>(
config, topic_property_iceberg_target_lag_ms);
v.has_value()) {
return std::chrono::milliseconds{v.value()};
}
return std::nullopt;
}

cluster::custom_assignable_topic_configuration
to_cluster_type(const creatable_topic& t) {
auto cfg = cluster::topic_configuration(
Expand Down Expand Up @@ -280,6 +290,9 @@ to_cluster_type(const creatable_topic& t) {
= get_enum_value<model::iceberg_invalid_record_action>(
config_entries, topic_property_iceberg_invalid_record_action);

cfg.properties.iceberg_target_lag_ms = get_iceberg_target_lag_ms(
config_entries);

schema_id_validation_config_parser schema_id_validation_config_parser{
cfg.properties};

Expand Down
3 changes: 3 additions & 0 deletions src/v/kafka/server/handlers/topics/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ inline constexpr std::string_view topic_property_iceberg_partition_spec
inline constexpr std::string_view topic_property_iceberg_invalid_record_action
= "redpanda.iceberg.invalid.record.action";

inline constexpr std::string_view topic_property_iceberg_target_lag_ms
= "redpanda.iceberg.target.lag.ms";

// Kafka topic properties that is not relevant for Redpanda
// Or cannot be altered with kafka alter handler
inline constexpr std::array<std::string_view, 20> allowlist_topic_noop_confs = {
Expand Down
Loading

0 comments on commit ddb8dc0

Please sign in to comment.