Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -7855,6 +7855,48 @@ rd_kafka_ConfigEntry_is_sensitive(const rd_kafka_ConfigEntry_t *entry);
RD_EXPORT int
rd_kafka_ConfigEntry_is_synonym(const rd_kafka_ConfigEntry_t *entry);

/**
* @enum rd_kafka_ConfigType_t
* @brief Apache Kafka config types.
*/
typedef enum rd_kafka_ConfigType_t {
RD_KAFKA_CONFIG_UNKNOWN = 0,
RD_KAFKA_CONFIG_BOOLEAN = 1,
RD_KAFKA_CONFIG_STRING = 2,
RD_KAFKA_CONFIG_INT = 3,
RD_KAFKA_CONFIG_SHORT = 4,
RD_KAFKA_CONFIG_LONG = 5,
RD_KAFKA_CONFIG_DOUBLE = 6,
RD_KAFKA_CONFIG_LIST = 7,
RD_KAFKA_CONFIG_CLASS = 8,
RD_KAFKA_CONFIG_PASSWORD = 9,
RD_KAFKA_CONFIG__CNT,
} rd_kafka_ConfigType_t;

/**
* @returns the config type.
*
* @param entry Entry to get type for.
*
* @remark The lifetime of the returned entry is the same as \p conf .
Copy link

Copilot AI Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter reference \\p conf is incorrect. It should reference \\p entry since that's the actual parameter name for this function.

Copilot uses AI. Check for mistakes.

* @remark Shall only be used on a DescribeConfigs result,
* otherwise returns RD_KAFKA_CONFIG_UNKNOWN.
*/
RD_EXPORT const rd_kafka_ConfigType_t
rd_kafka_ConfigEntry_type(const rd_kafka_ConfigEntry_t *entry);

/**
* @returns the config documentation.
*
* @param entry Entry to get documentation for.
*
* @remark The lifetime of the returned entry is the same as \p conf .
Copy link

Copilot AI Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter reference \\p conf is incorrect. It should reference \\p entry since that's the actual parameter name for this function.

Copilot uses AI. Check for mistakes.

* @remark Shall only be used on a DescribeConfigs result,
* otherwise returns NULL.
*/
RD_EXPORT const char *
rd_kafka_ConfigEntry_documentation(const rd_kafka_ConfigEntry_t *entry);


/**
* @returns the synonym config entry array.
Expand Down
39 changes: 34 additions & 5 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2853,6 +2853,16 @@ rd_kafka_ConfigEntry_synonyms(const rd_kafka_ConfigEntry_t *entry,
return (const rd_kafka_ConfigEntry_t **)entry->synonyms.rl_elems;
}

const rd_kafka_ConfigType_t
rd_kafka_ConfigEntry_type(const rd_kafka_ConfigEntry_t *entry) {
return entry->type;
}

const char *
rd_kafka_ConfigEntry_documentation(const rd_kafka_ConfigEntry_t *entry) {
return entry->documentation;
}


/**@}*/

Expand Down Expand Up @@ -3662,12 +3672,15 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,
int32_t Throttle_Time;
rd_kafka_ConfigResource_t *config = NULL;
rd_kafka_ConfigEntry_t *entry = NULL;
int16_t api_version;

api_version = rd_kafka_buf_ApiVersion(reply);

Comment on lines +3675 to 3678
Copy link

Copilot AI Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable api_version should be declared at the point of initialization rather than separately. Consider combining the declaration and assignment: int16_t api_version = rd_kafka_buf_ApiVersion(reply);

Suggested change
int16_t api_version;
api_version = rd_kafka_buf_ApiVersion(reply);
int16_t api_version = rd_kafka_buf_ApiVersion(reply);

Copilot uses AI. Check for mistakes.

rd_kafka_buf_read_i32(reply, &Throttle_Time);
rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);

/* #resources */
rd_kafka_buf_read_i32(reply, &res_cnt);
rd_kafka_buf_read_arraycnt(reply, &res_cnt, RD_KAFKAP_CONFIGS_MAX);

if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
rd_kafka_buf_parse_fail(
Expand Down Expand Up @@ -3728,10 +3741,13 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,
config->errstr = rd_strdup(this_errstr);

/* #config_entries */
rd_kafka_buf_read_i32(reply, &entry_cnt);
rd_kafka_buf_read_arraycnt(reply, &entry_cnt,
RD_KAFKAP_CONFIGS_MAX);

for (ci = 0; ci < (int)entry_cnt; ci++) {
rd_kafkap_str_t config_name, config_value;
rd_kafkap_str_t config_name, config_value,
documentation;
int8_t config_type;
int32_t syn_cnt;
int si;

Expand Down Expand Up @@ -3767,9 +3783,10 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_buf_read_bool(reply, &entry->a.is_sensitive);


if (rd_kafka_buf_ApiVersion(reply) == 1) {
if (rd_kafka_buf_ApiVersion(reply) >= 1) {
/* #config_synonyms (ApiVersion 1) */
rd_kafka_buf_read_i32(reply, &syn_cnt);
rd_kafka_buf_read_arraycnt(
reply, &syn_cnt, RD_KAFKAP_CONFIGS_MAX);

if (syn_cnt > 100000)
rd_kafka_buf_parse_fail(
Expand Down Expand Up @@ -3800,6 +3817,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_buf_read_str(reply, &syn_name);
rd_kafka_buf_read_str(reply, &syn_value);
rd_kafka_buf_read_i8(reply, &syn_source);
rd_kafka_buf_skip_tags(reply);

syn_entry = rd_kafka_ConfigEntry_new0(
syn_name.str, RD_KAFKAP_STR_LEN(&syn_name),
Expand All @@ -3825,9 +3843,19 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,
rd_list_add(&entry->synonyms, syn_entry);
}

if (api_version >= 3) {
rd_kafka_buf_read_i8(reply, &config_type);
Copy link

Copilot AI Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assignment to entry->type should cast config_type to rd_kafka_ConfigType_t for type safety, since config_type is declared as int8_t but the field expects rd_kafka_ConfigType_t.

Suggested change
rd_kafka_buf_read_i8(reply, &config_type);
entry->type = (rd_kafka_ConfigType_t)config_type;

Copilot uses AI. Check for mistakes.

rd_kafka_buf_read_str(reply, &documentation);
entry->type = config_type;
entry->documentation =
RD_KAFKAP_STR_DUP(&documentation);
}
rd_kafka_buf_skip_tags(reply);

rd_kafka_ConfigResource_add_ConfigEntry(config, entry);
entry = NULL;
}
rd_kafka_buf_skip_tags(reply);

/* As a convenience to the application we insert result
* in the same order as they were requested. The broker
Expand Down Expand Up @@ -3856,6 +3884,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,
config);
config = NULL;
}
rd_kafka_buf_skip_tags(reply);

*rko_resultp = rko_result;

Expand Down
4 changes: 3 additions & 1 deletion src/rdkafka_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ struct rd_kafka_ConfigEntry_s {
rd_bool_t is_synonym; /**< Value is synonym */
} a;

rd_list_t synonyms; /**< Type (rd_kafka_configEntry *) */
rd_list_t synonyms; /**< Type (rd_kafka_configEntry *) */
rd_kafka_ConfigType_t type; /**< Config type */
char *documentation; /**< Config documentation */
};

/**
Expand Down
23 changes: 15 additions & 8 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -5511,7 +5511,7 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest(
}

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_DescribeConfigs, 0, 1, NULL);
rkb, RD_KAFKAP_DescribeConfigs, 0, 4, NULL);
if (ApiVersion == -1) {
rd_snprintf(errstr, errstr_size,
"DescribeConfigs (KIP-133) not supported "
Expand All @@ -5520,11 +5520,12 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest(
return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
}

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeConfigs, 1,
rd_list_cnt(configs) * 200);
rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_DescribeConfigs,
1, rd_list_cnt(configs) * 200,
ApiVersion >= 4);

/* #resources */
rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs));
rd_kafka_buf_write_arraycnt(rkbuf, rd_list_cnt(configs));

RD_LIST_FOREACH(config, configs, i) {
const rd_kafka_ConfigEntry_t *entry;
Expand All @@ -5541,25 +5542,31 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest(
/* #config */
if (rd_list_empty(&config->config)) {
/* Get all configs */
rd_kafka_buf_write_i32(rkbuf, -1);
rd_kafka_buf_write_arraycnt(rkbuf, -1);
} else {
/* Get requested configs only */
rd_kafka_buf_write_i32(rkbuf,
rd_list_cnt(&config->config));
rd_kafka_buf_write_arraycnt(
rkbuf, rd_list_cnt(&config->config));
}

RD_LIST_FOREACH(entry, &config->config, ei) {
/* config_name */
rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
}
rd_kafka_buf_write_tags_empty(rkbuf);
}


if (ApiVersion == 1) {
if (ApiVersion >= 1) {
/* include_synonyms */
rd_kafka_buf_write_i8(rkbuf, 1);
}

if (ApiVersion >= 3) {
/* include_documentation */
rd_kafka_buf_write_i8(rkbuf, 1);
}

/* timeout */
op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
Expand Down
10 changes: 8 additions & 2 deletions tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,9 @@ static void test_print_ConfigEntry_array(const rd_kafka_ConfigEntry_t **entries,
"%s#%" PRIusz "/%" PRIusz
": Source %s (%d): \"%s\"=\"%s\" "
"[is read-only=%s, default=%s, sensitive=%s, "
"synonym=%s] with %" PRIusz " synonym(s)\n",
"synonym=%s] with %" PRIusz
" synonym(s) ConfigType: (%d) "
"Documentation: %s\n",
indent, ei, entry_cnt,
rd_kafka_ConfigSource_name(rd_kafka_ConfigEntry_source(e)),
rd_kafka_ConfigEntry_source(e),
Expand All @@ -693,7 +695,11 @@ static void test_print_ConfigEntry_array(const rd_kafka_ConfigEntry_t **entries,
YN(rd_kafka_ConfigEntry_is_read_only(e)),
YN(rd_kafka_ConfigEntry_is_default(e)),
YN(rd_kafka_ConfigEntry_is_sensitive(e)),
YN(rd_kafka_ConfigEntry_is_synonym(e)), syn_cnt);
YN(rd_kafka_ConfigEntry_is_synonym(e)), syn_cnt,
rd_kafka_ConfigEntry_type(e),
rd_kafka_ConfigEntry_documentation(e)
? rd_kafka_ConfigEntry_documentation(e)
: "(NULL)");
#undef YN

if (syn_cnt > 0)
Expand Down