Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,11 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk,
rd_kafka_cgrp_t *rkcg = opaque;
const int log_decode_errors = LOG_ERR;
int16_t ErrorCode = 0;
int32_t member_cnt;
int i;
rd_kafkap_str_t MemberId;
rd_kafkap_str_t GroupInstanceId;
int16_t MemberErrorCode;

if (err) {
ErrorCode = err;
Expand All @@ -979,6 +984,28 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk,

rd_kafka_buf_read_i16(rkbuf, &ErrorCode);

if (request->rkbuf_reqhdr.ApiVersion >= 3) {
rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt,
RD_KAFKAP_TOPICS_MAX);
for (i = 0; i < member_cnt; i++) {
rd_kafka_buf_read_str(rkbuf, &MemberId);
rd_kafka_buf_read_str(rkbuf, &GroupInstanceId);
rd_kafka_buf_read_i16(rkbuf, &MemberErrorCode);

if (request->rkbuf_reqhdr.ApiVersion >= 4) {
rd_kafka_buf_skip_tags(rkbuf);
}

if (ErrorCode == 0 && MemberErrorCode != 0) {
ErrorCode = MemberErrorCode;
}
}
}

if (request->rkbuf_reqhdr.ApiVersion >= 4) {
rd_kafka_buf_skip_tags(rkbuf);
}

err:
if (ErrorCode)
rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
Expand Down Expand Up @@ -1124,6 +1151,7 @@ static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) {

static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) {
char *member_id;
int member_cnt = 1;
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable member_cnt is declared but never modified or used meaningfully. Consider removing it or using it to make the code more maintainable if it's intended for future use.

Copilot uses AI. Check for mistakes.

RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id);

Expand All @@ -1148,12 +1176,19 @@ static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) {
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE;

if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) {
rd_kafkap_str_t *member_id_new;
member_id_new = rd_kafkap_str_new(member_id, -1);

rd_kafka_leave_member_t member = {
.member_id = member_id_new,
.group_instance_id = rkcg->rkcg_group_instance_id};
rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE",
"Leaving group");
rd_kafka_LeaveGroupRequest(
rkcg->rkcg_coord, rkcg->rkcg_group_id->str, member_id,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
rkcg->rkcg_coord, rkcg->rkcg_group_id->str, &member,
member_cnt, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
rd_kafka_cgrp_handle_LeaveGroup, rkcg);
rd_kafkap_str_destroy(member_id_new);
} else
rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_coord,
RD_KAFKA_RESP_ERR__WAIT_COORD,
Expand Down
77 changes: 65 additions & 12 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -1746,13 +1746,38 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn,
rd_kafka_mock_broker_t *mrkb;
const rd_bool_t log_decode_errors = rd_true;
rd_kafka_buf_t *resp = rd_kafka_mock_buf_new_response(rkbuf);
rd_kafkap_str_t GroupId, MemberId;
rd_kafkap_str_t GroupId;
int32_t member_cnt = 1;
rd_kafkap_str_t *members_id;
rd_kafkap_str_t *members_instance_id;
rd_kafka_resp_err_t *member_errors;
rd_kafka_resp_err_t err;
rd_kafka_mock_cgrp_classic_t *mcgrp;
rd_kafka_mock_cgrp_classic_member_t *member = NULL;
int i;

rd_kafka_buf_read_str(rkbuf, &GroupId);
rd_kafka_buf_read_str(rkbuf, &MemberId);

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) {
rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt,
RD_KAFKAP_TOPICS_MAX);

members_id = rd_alloca(member_cnt * sizeof(*members_id));
members_instance_id =
rd_alloca(member_cnt * sizeof(*members_instance_id));

for (i = 0; i < member_cnt; i++) {
rd_kafka_buf_read_str(rkbuf, &members_id[i]);
rd_kafka_buf_read_str(rkbuf, &members_instance_id[i]);

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4)
rd_kafka_buf_skip_tags(rkbuf);
}
} else {
members_id = rd_alloca(sizeof(*members_id));
members_instance_id = NULL;
rd_kafka_buf_read_str(rkbuf, &members_id[0]);
}

/*
* Construct response
Expand Down Expand Up @@ -1781,21 +1806,49 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn,
err = RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND;
}

member_errors = rd_alloca(member_cnt * sizeof(*member_errors));

if (!err) {
member =
rd_kafka_mock_cgrp_classic_member_find(mcgrp, &MemberId);
if (!member)
err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
for (i = 0; i < member_cnt; i++) {
member = rd_kafka_mock_cgrp_classic_member_find(
mcgrp, &members_id[i]);
if (!member) {
member_errors[i] =
RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
} else {
member_errors[i] =
rd_kafka_mock_cgrp_classic_check_state(
mcgrp, member, rkbuf, -1);
if (!member_errors[i])
rd_kafka_mock_cgrp_classic_member_leave(
mcgrp, member);
}

/* For v0-2, promote first member error to top-level */
if (rkbuf->rkbuf_reqhdr.ApiVersion < 3 &&
member_errors[i] && !err)
err = member_errors[i];
}
} else {
for (i = 0; i < member_cnt; i++) {
member_errors[i] = err;
}
}

if (!err)
err = rd_kafka_mock_cgrp_classic_check_state(mcgrp, member,
rkbuf, -1);
/* Write top-level error code */
rd_kafka_buf_write_i16(resp, err);

if (!err)
rd_kafka_mock_cgrp_classic_member_leave(mcgrp, member);
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) {
rd_kafka_buf_write_arraycnt(resp, member_cnt);
for (i = 0; i < member_cnt; i++) {
rd_kafka_buf_write_kstr(resp, &members_id[i]);
rd_kafka_buf_write_kstr(resp, &members_instance_id[i]);
rd_kafka_buf_write_i16(resp, member_errors[i]);

rd_kafka_buf_write_i16(resp, err); /* ErrorCode */
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4)
rd_kafka_buf_write_tags_empty(resp);
}
}

rd_kafka_mock_connection_send_response(mconn, resp);

Expand Down
78 changes: 22 additions & 56 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2147,21 +2147,39 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,
*/
void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb,
const char *group_id,
const char *member_id,
const rd_kafka_leave_member_t *members,
int member_cnt,
rd_kafka_replyq_t replyq,
rd_kafka_resp_cb_t *resp_cb,
void *opaque) {
rd_kafka_buf_t *rkbuf;
int16_t ApiVersion = 0;
int features;
int i;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_LeaveGroup, 0, 1, &features);
rkb, RD_KAFKAP_LeaveGroup, 0, 4, &features);

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1, 300);
rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_LeaveGroup, 1,
300, ApiVersion >= 4);

rd_kafka_buf_write_str(rkbuf, group_id, -1);
rd_kafka_buf_write_str(rkbuf, member_id, -1);

if (ApiVersion >= 3) {
rd_kafka_buf_write_arraycnt(rkbuf, member_cnt);
for (i = 0; i < member_cnt; i++) {
rd_kafka_buf_write_kstr(rkbuf, members[i].member_id);
rd_kafka_buf_write_kstr(rkbuf,
members[i].group_instance_id);
if (ApiVersion >= 4) {
rd_kafka_buf_write_tags_empty(rkbuf);
}
}
} else {
/* v0-2: Only supports single member */
rd_assert(member_cnt == 1);
rd_kafka_buf_write_kstr(rkbuf, members[0].member_id);
}

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

Expand All @@ -2176,58 +2194,6 @@ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb,
}


/**
* Handler for LeaveGroup responses
* opaque must be the cgrp handle.
*/
void rd_kafka_handle_LeaveGroup(rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_cgrp_t *rkcg = opaque;
const int log_decode_errors = LOG_ERR;
int16_t ErrorCode = 0;
int actions;

if (err) {
ErrorCode = err;
goto err;
}

rd_kafka_buf_read_i16(rkbuf, &ErrorCode);

err:
actions = rd_kafka_err_action(rkb, ErrorCode, request,
RD_KAFKA_ERR_ACTION_END);

if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
/* Re-query for coordinator */
rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
RD_KAFKA_OP_COORD_QUERY, ErrorCode);
}

if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
if (rd_kafka_buf_retry(rkb, request))
return;
/* FALLTHRU */
}

if (ErrorCode)
rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
"LeaveGroup response: %s",
rd_kafka_err2str(ErrorCode));

return;

err_parse:
ErrorCode = rkbuf->rkbuf_err;
goto err;
}



/**
* Send HeartbeatRequest
*/
Expand Down
12 changes: 11 additions & 1 deletion src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ typedef struct rd_kafkap_Fetch_reply_tags_s {

/**@}*/

/**
* @brief LeaveGroup memebr identity (for LeaveGroup version 3)
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'memebr' to 'member'.

Suggested change
* @brief LeaveGroup memebr identity (for LeaveGroup version 3)
* @brief LeaveGroup member identity (for LeaveGroup version 3)

Copilot uses AI. Check for mistakes.
*/
typedef struct rd_kafka_leave_member_s {
rd_kafkap_str_t *member_id;
rd_kafkap_str_t *group_instance_id;
} rd_kafka_leave_member_t;


rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions(
rd_kafka_buf_t *rkbuf,
rd_bool_t use_topic_id,
Expand Down Expand Up @@ -326,7 +335,8 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,

void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb,
const char *group_id,
const char *member_id,
const rd_kafka_leave_member_t *members,
int member_cnt,
rd_kafka_replyq_t replyq,
rd_kafka_resp_cb_t *resp_cb,
void *opaque);
Expand Down