diff --git a/src/rdkafka.h b/src/rdkafka.h index c0895f93c..9b50db6ad 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -7855,6 +7855,48 @@ rd_kafka_ConfigEntry_is_sensitive(const rd_kafka_ConfigEntry_t *entry); RD_EXPORT int rd_kafka_ConfigEntry_is_synonym(const rd_kafka_ConfigEntry_t *entry); +/** + * @enum rd_kafka_ConfigType_t + * @brief Apache Kafka config types. + */ +typedef enum rd_kafka_ConfigType_t { + RD_KAFKA_CONFIG_UNKNOWN = 0, + RD_KAFKA_CONFIG_BOOLEAN = 1, + RD_KAFKA_CONFIG_STRING = 2, + RD_KAFKA_CONFIG_INT = 3, + RD_KAFKA_CONFIG_SHORT = 4, + RD_KAFKA_CONFIG_LONG = 5, + RD_KAFKA_CONFIG_DOUBLE = 6, + RD_KAFKA_CONFIG_LIST = 7, + RD_KAFKA_CONFIG_CLASS = 8, + RD_KAFKA_CONFIG_PASSWORD = 9, + RD_KAFKA_CONFIG__CNT, +} rd_kafka_ConfigType_t; + +/** + * @returns the config type. + * + * @param entry Entry to get type for. + * + * @remark The lifetime of the returned entry is the same as \p conf . + * @remark Shall only be used on a DescribeConfigs result, + * otherwise returns RD_KAFKA_CONFIG_UNKNOWN. + */ +RD_EXPORT const rd_kafka_ConfigType_t +rd_kafka_ConfigEntry_type(const rd_kafka_ConfigEntry_t *entry); + +/** + * @returns the config documentation. + * + * @param entry Entry to get documentation for. + * + * @remark The lifetime of the returned entry is the same as \p conf . + * @remark Shall only be used on a DescribeConfigs result, + * otherwise returns NULL. + */ +RD_EXPORT const char * +rd_kafka_ConfigEntry_documentation(const rd_kafka_ConfigEntry_t *entry); + /** * @returns the synonym config entry array. diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index b2671f3c8..83e823288 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -2853,6 +2853,16 @@ rd_kafka_ConfigEntry_synonyms(const rd_kafka_ConfigEntry_t *entry, return (const rd_kafka_ConfigEntry_t **)entry->synonyms.rl_elems; } +const rd_kafka_ConfigType_t +rd_kafka_ConfigEntry_type(const rd_kafka_ConfigEntry_t *entry) { + return entry->type; +} + +const char * +rd_kafka_ConfigEntry_documentation(const rd_kafka_ConfigEntry_t *entry) { + return entry->documentation; +} + /**@}*/ @@ -3662,12 +3672,15 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, int32_t Throttle_Time; rd_kafka_ConfigResource_t *config = NULL; rd_kafka_ConfigEntry_t *entry = NULL; + int16_t api_version; + + api_version = rd_kafka_buf_ApiVersion(reply); rd_kafka_buf_read_i32(reply, &Throttle_Time); rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); /* #resources */ - rd_kafka_buf_read_i32(reply, &res_cnt); + rd_kafka_buf_read_arraycnt(reply, &res_cnt, RD_KAFKAP_CONFIGS_MAX); if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) rd_kafka_buf_parse_fail( @@ -3728,10 +3741,13 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, config->errstr = rd_strdup(this_errstr); /* #config_entries */ - rd_kafka_buf_read_i32(reply, &entry_cnt); + rd_kafka_buf_read_arraycnt(reply, &entry_cnt, + RD_KAFKAP_CONFIGS_MAX); for (ci = 0; ci < (int)entry_cnt; ci++) { - rd_kafkap_str_t config_name, config_value; + rd_kafkap_str_t config_name, config_value, + documentation; + int8_t config_type; int32_t syn_cnt; int si; @@ -3767,9 +3783,10 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_buf_read_bool(reply, &entry->a.is_sensitive); - if (rd_kafka_buf_ApiVersion(reply) == 1) { + if (rd_kafka_buf_ApiVersion(reply) >= 1) { /* #config_synonyms (ApiVersion 1) */ - rd_kafka_buf_read_i32(reply, &syn_cnt); + rd_kafka_buf_read_arraycnt( + reply, &syn_cnt, RD_KAFKAP_CONFIGS_MAX); if (syn_cnt > 100000) rd_kafka_buf_parse_fail( @@ -3800,6 +3817,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_buf_read_str(reply, &syn_name); rd_kafka_buf_read_str(reply, &syn_value); rd_kafka_buf_read_i8(reply, &syn_source); + rd_kafka_buf_skip_tags(reply); syn_entry = rd_kafka_ConfigEntry_new0( syn_name.str, RD_KAFKAP_STR_LEN(&syn_name), @@ -3825,9 +3843,19 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_list_add(&entry->synonyms, syn_entry); } + if (api_version >= 3) { + rd_kafka_buf_read_i8(reply, &config_type); + rd_kafka_buf_read_str(reply, &documentation); + entry->type = config_type; + entry->documentation = + RD_KAFKAP_STR_DUP(&documentation); + } + rd_kafka_buf_skip_tags(reply); + rd_kafka_ConfigResource_add_ConfigEntry(config, entry); entry = NULL; } + rd_kafka_buf_skip_tags(reply); /* As a convenience to the application we insert result * in the same order as they were requested. The broker @@ -3856,6 +3884,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, config); config = NULL; } + rd_kafka_buf_skip_tags(reply); *rko_resultp = rko_result; diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index c84849ea6..0b96080c4 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -233,7 +233,9 @@ struct rd_kafka_ConfigEntry_s { rd_bool_t is_synonym; /**< Value is synonym */ } a; - rd_list_t synonyms; /**< Type (rd_kafka_configEntry *) */ + rd_list_t synonyms; /**< Type (rd_kafka_configEntry *) */ + rd_kafka_ConfigType_t type; /**< Config type */ + char *documentation; /**< Config documentation */ }; /** diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 17ece7fdb..861f13d2f 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -5511,7 +5511,7 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( } ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_DescribeConfigs, 0, 1, NULL); + rkb, RD_KAFKAP_DescribeConfigs, 0, 4, NULL); if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, "DescribeConfigs (KIP-133) not supported " @@ -5520,11 +5520,12 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeConfigs, 1, - rd_list_cnt(configs) * 200); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_DescribeConfigs, + 1, rd_list_cnt(configs) * 200, + ApiVersion >= 4); /* #resources */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs)); + rd_kafka_buf_write_arraycnt(rkbuf, rd_list_cnt(configs)); RD_LIST_FOREACH(config, configs, i) { const rd_kafka_ConfigEntry_t *entry; @@ -5541,25 +5542,31 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( /* #config */ if (rd_list_empty(&config->config)) { /* Get all configs */ - rd_kafka_buf_write_i32(rkbuf, -1); + rd_kafka_buf_write_arraycnt(rkbuf, -1); } else { /* Get requested configs only */ - rd_kafka_buf_write_i32(rkbuf, - rd_list_cnt(&config->config)); + rd_kafka_buf_write_arraycnt( + rkbuf, rd_list_cnt(&config->config)); } RD_LIST_FOREACH(entry, &config->config, ei) { /* config_name */ rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); } + rd_kafka_buf_write_tags_empty(rkbuf); } - if (ApiVersion == 1) { + if (ApiVersion >= 1) { /* include_synonyms */ rd_kafka_buf_write_i8(rkbuf, 1); } + if (ApiVersion >= 3) { + /* include_documentation */ + rd_kafka_buf_write_i8(rkbuf, 1); + } + /* timeout */ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) diff --git a/tests/0081-admin.c b/tests/0081-admin.c index f16f958e5..d765de3c1 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -682,7 +682,9 @@ static void test_print_ConfigEntry_array(const rd_kafka_ConfigEntry_t **entries, "%s#%" PRIusz "/%" PRIusz ": Source %s (%d): \"%s\"=\"%s\" " "[is read-only=%s, default=%s, sensitive=%s, " - "synonym=%s] with %" PRIusz " synonym(s)\n", + "synonym=%s] with %" PRIusz + " synonym(s) ConfigType: (%d) " + "Documentation: %s\n", indent, ei, entry_cnt, rd_kafka_ConfigSource_name(rd_kafka_ConfigEntry_source(e)), rd_kafka_ConfigEntry_source(e), @@ -693,7 +695,11 @@ static void test_print_ConfigEntry_array(const rd_kafka_ConfigEntry_t **entries, YN(rd_kafka_ConfigEntry_is_read_only(e)), YN(rd_kafka_ConfigEntry_is_default(e)), YN(rd_kafka_ConfigEntry_is_sensitive(e)), - YN(rd_kafka_ConfigEntry_is_synonym(e)), syn_cnt); + YN(rd_kafka_ConfigEntry_is_synonym(e)), syn_cnt, + rd_kafka_ConfigEntry_type(e), + rd_kafka_ConfigEntry_documentation(e) + ? rd_kafka_ConfigEntry_documentation(e) + : "(NULL)"); #undef YN if (syn_cnt > 0)