Skip to content

Commit c01592b

Browse files
committed
Implement Consumer Group Offset APIs
This change implements `listConsumerGroupOffsets`, `alterConsumerGroupOffsets`, and `deleteConsumerGroupOffsets` on the admin client. on-behalf-of: @SAP [email protected]
1 parent 12cb67e commit c01592b

File tree

21 files changed

+2105
-141
lines changed

21 files changed

+2105
-141
lines changed

docs/admin.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,45 @@ Options:
128128
| -------- | ------------------------------- | -------------------------------------------------------------------------------- |
129129
| topics | `DescribeLogDirsRequestTopic[]` | Array of topics specifying the topics and partitions for which to describe logs. |
130130

131+
### `listConsumerGroupOffsets(options[, callback])`
132+
133+
Lists committed offsets for consumer groups.
134+
135+
The return value is an array of groups with information about committed offsets per partition per topic.
136+
137+
Options:
138+
139+
| Property | Type | Description |
140+
| ------------- | --------------------------- | --------------------------------------------------------------------------- |
141+
| groups | `OffsetFetchRequestGroup[]` | Array of groups specifying the groups and topics for which to list offsets. |
142+
| requireStable | `boolean` | Whether to require stable offsets. |
143+
144+
### `alterConsumerGroupOffsets(options[, callback])`
145+
146+
Alters committed offsets for a consumer group. Note, that the consumer group must be empty to succeed.
147+
148+
The return value is `void`.
149+
150+
Options:
151+
152+
| Property | Type | Description |
153+
| -------- | ---------------------------------- | ------------------------------------------------------------ |
154+
| groupId | `string` | The consumer group ID for which to alter offsets. |
155+
| topics | `AlterConsumerGroupOffsetsTopic[]` | Array of topics with partitions and their new offset values. |
156+
157+
### `deleteConsumerGroupOffsets(options[, callback])`
158+
159+
Deletes committed offsets for specific topics of a consumer group.
160+
161+
The return value is an array of topic partitions indicating which offsets were deleted.
162+
163+
Options:
164+
165+
| Property | Type | Description |
166+
| -------- | ------------------- | ----------------------------------------------------------- |
167+
| groupId | `string` | The consumer group ID from which to delete offsets. |
168+
| topics | `TopicPartitions[]` | Array of topics and partitions for which to delete offsets. |
169+
131170
### `close([callback])`
132171

133172
Closes the admin and all its connections.

docs/diagnostic.md

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,24 @@ Each tracing channel publishes events with the following common properties:
6262

6363
## Published tracing channels
6464

65-
| Name | Target | Description |
66-
| ----------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------- |
67-
| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. |
68-
| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. |
69-
| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. |
70-
| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. |
71-
| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. |
72-
| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. |
73-
| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. |
74-
| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. |
75-
| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. |
76-
| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. |
77-
| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. |
78-
| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. |
79-
| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. |
80-
| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. |
81-
| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. |
82-
| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. |
83-
| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. |
84-
| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. |
65+
| Name | Target | Description |
66+
| -------------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------- |
67+
| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. |
68+
| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. |
69+
| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. |
70+
| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. |
71+
| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. |
72+
| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. |
73+
| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. |
74+
| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. |
75+
| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. |
76+
| `plt:kafka:admin:consumerGroupOffsets` | `Admin` | Traces a `Admin.listConsumerGroupOffsets` or `Admin.deleteConsumerGroupOffsets` request. |
77+
| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. |
78+
| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. |
79+
| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. |
80+
| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. |
81+
| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. |
82+
| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. |
83+
| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. |
84+
| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. |
85+
| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. |

playground/apis/admin/offset-delete.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,7 @@ await performAPICallWithRetry('OffsetDelete', () =>
99
offsetDeleteV0.async(connection, 'g2', [
1010
{
1111
name: 'temp',
12-
partitions: [
13-
{
14-
partitionIndex: 0
15-
}
16-
]
12+
partitions: [0]
1713
}
1814
]))
1915

playground/apis/consumer/fetch.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { api as fetchV17 } from '../../../src/apis/consumer/fetch-v17.ts'
22
import { api as offsetCommitV9 } from '../../../src/apis/consumer/offset-commit-v9.ts'
3-
import { api as offsetFetchV9 } from '../../../src/apis/consumer/offset-fetch-v9.ts'
3+
import { api as offsetFetchV9 } from '../../../src/apis/admin/offset-fetch-v9.ts'
44
import { api as syncGroupV5 } from '../../../src/apis/consumer/sync-group-v5.ts'
55
import { FetchIsolationLevels, FindCoordinatorKeyTypes } from '../../../src/apis/enumerations.ts'
66
import { api as findCoordinatorV6 } from '../../../src/apis/metadata/find-coordinator-v6.ts'
@@ -45,7 +45,7 @@ const offsetFetch = await performAPICallWithRetry('OffsetFetch', () =>
4545
topics: [
4646
{
4747
name: topicName,
48-
partitionIndexes: [0]
48+
partitions: [0]
4949
}
5050
]
5151
}
@@ -103,11 +103,14 @@ for (let i = 0; i < 3; i++) {
103103
break
104104
}
105105

106-
const { nextOffset, records } = batches.reduce<{ nextOffset: bigint, records: KafkaRecord[] }>((acc, batch) => {
107-
acc.nextOffset = batch.firstOffset + BigInt(batch.records.length)
108-
acc.records.push(...batch.records)
109-
return acc
110-
}, { nextOffset: fetchOffset, records: [] })
106+
const { nextOffset, records } = batches.reduce<{ nextOffset: bigint; records: KafkaRecord[] }>(
107+
(acc, batch) => {
108+
acc.nextOffset = batch.firstOffset + BigInt(batch.records.length)
109+
acc.records.push(...batch.records)
110+
return acc
111+
},
112+
{ nextOffset: fetchOffset, records: [] }
113+
)
111114

112115
fetchOffset = nextOffset
113116
console.log(

src/apis/admin/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,5 @@ export * as offsetDeleteV0 from './offset-delete-v0.ts'
4040
export * as renewDelegationTokenV2 from './renew-delegation-token-v2.ts'
4141
export * as unregisterBrokerV0 from './unregister-broker-v0.ts'
4242
export * as updateFeaturesV1 from './update-features-v1.ts'
43+
export * as offsetFetchV8 from './offset-fetch-v8.ts'
44+
export * as offsetFetchV9 from './offset-fetch-v9.ts'

src/apis/admin/offset-delete-v0.ts

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,7 @@ import { ResponseError } from '../../errors.ts'
22
import { type Reader } from '../../protocol/reader.ts'
33
import { Writer } from '../../protocol/writer.ts'
44
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'
5-
6-
export interface OffsetDeleteRequestPartition {
7-
partitionIndex: number
8-
}
9-
10-
export interface OffsetDeleteRequestTopic {
11-
name: string
12-
partitions: OffsetDeleteRequestPartition[]
13-
}
5+
import { type TopicPartitions } from '../types.ts'
146

157
export type OffsetDeleteRequest = Parameters<typeof createRequest>
168

@@ -38,13 +30,13 @@ export interface OffsetDeleteResponse {
3830
partitions => partition_index
3931
partition_index => INT32
4032
*/
41-
export function createRequest (groupId: string, topics: OffsetDeleteRequestTopic[]): Writer {
33+
export function createRequest (groupId: string, topics: TopicPartitions[]): Writer {
4234
return Writer.create()
4335
.appendString(groupId, false)
4436
.appendArray(
4537
topics,
4638
(w, t) => {
47-
w.appendString(t.name, false).appendArray(t.partitions, (w, p) => w.appendInt32(p.partitionIndex), false, false)
39+
w.appendString(t.name, false).appendArray(t.partitions, (w, p) => w.appendInt32(p), false, false)
4840
},
4941
false,
5042
false
@@ -78,23 +70,31 @@ export function parseResponse (
7870
const response: OffsetDeleteResponse = {
7971
errorCode,
8072
throttleTimeMs: reader.readInt32(),
81-
topics: reader.readArray((r, i) => {
82-
return {
83-
name: r.readString(),
84-
partitions: r.readArray((r, j) => {
85-
const partition = {
86-
partitionIndex: r.readInt32(),
87-
errorCode: r.readInt16()
88-
}
89-
90-
if (partition.errorCode !== 0) {
91-
errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode])
92-
}
93-
94-
return partition
95-
})
96-
}
97-
})
73+
topics: reader.readArray(
74+
(r, i) => {
75+
return {
76+
name: r.readString(false),
77+
partitions: r.readArray(
78+
(r, j) => {
79+
const partition = {
80+
partitionIndex: r.readInt32(),
81+
errorCode: r.readInt16()
82+
}
83+
84+
if (partition.errorCode !== 0) {
85+
errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode])
86+
}
87+
88+
return partition
89+
},
90+
false,
91+
false
92+
)
93+
}
94+
},
95+
false,
96+
false
97+
)
9898
}
9999

100100
if (errors.length) {

src/apis/consumer/offset-fetch-v8.ts renamed to src/apis/admin/offset-fetch-v8.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ResponseError } from '../../errors.ts'
2-
import { type NullableString } from '../../protocol/definitions.ts'
2+
import { type Nullable, type NullableString } from '../../protocol/definitions.ts'
33
import { type Reader } from '../../protocol/reader.ts'
44
import { Writer } from '../../protocol/writer.ts'
55
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'
@@ -13,7 +13,7 @@ export interface OffsetFetchRequestGroup {
1313
groupId: string
1414
memberId?: NullableString
1515
memberEpoch: number
16-
topics: OffsetFetchRequestTopic[]
16+
topics?: Nullable<OffsetFetchRequestTopic[]>
1717
}
1818

1919
export type OffsetFetchRequest = Parameters<typeof createRequest>

src/apis/consumer/offset-fetch-v9.ts renamed to src/apis/admin/offset-fetch-v9.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
import { ResponseError } from '../../errors.ts'
2-
import { type NullableString } from '../../protocol/definitions.ts'
2+
import { type Nullable, type NullableString } from '../../protocol/definitions.ts'
33
import { type Reader } from '../../protocol/reader.ts'
44
import { Writer } from '../../protocol/writer.ts'
55
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'
6-
7-
export interface OffsetFetchRequestTopic {
8-
name: string
9-
partitionIndexes: number[]
10-
}
6+
import { type TopicPartitions } from '../types.ts'
117

128
export interface OffsetFetchRequestGroup {
139
groupId: string
1410
memberId?: NullableString
1511
memberEpoch: number
16-
topics: OffsetFetchRequestTopic[]
12+
topics?: Nullable<TopicPartitions[]>
1713
}
1814

1915
export type OffsetFetchRequest = Parameters<typeof createRequest>
@@ -60,7 +56,7 @@ export function createRequest (groups: OffsetFetchRequestGroup[], requireStable:
6056
.appendString(g.memberId)
6157
.appendInt32(g.memberEpoch)
6258
.appendArray(g.topics, (w, t) => {
63-
w.appendString(t.name).appendArray(t.partitionIndexes, (w, i) => w.appendInt32(i), true, false)
59+
w.appendString(t.name).appendArray(t.partitions, (w, i) => w.appendInt32(i), true, false)
6460
})
6561
})
6662
.appendBoolean(requireStable)

src/apis/consumer/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,5 @@ export * as listOffsetsV8 from './list-offsets-v8.ts'
1212
export * as listOffsetsV9 from './list-offsets-v9.ts'
1313
export * as offsetCommitV8 from './offset-commit-v8.ts'
1414
export * as offsetCommitV9 from './offset-commit-v9.ts'
15-
export * as offsetFetchV8 from './offset-fetch-v8.ts'
16-
export * as offsetFetchV9 from './offset-fetch-v9.ts'
1715
export * as offsetForLeaderEpochV4 from './offset-for-leader-epoch-v4.ts'
1816
export * as syncGroupV5 from './sync-group-v5.ts'

src/apis/consumer/offset-commit-v9.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export interface OffsetCommitResponse {
5050
export function createRequest (
5151
groupId: string,
5252
generationIdOrMemberEpoch: number,
53-
memberId: string,
53+
memberId: NullableString,
5454
groupInstanceId: NullableString,
5555
topics: OffsetCommitRequestTopic[]
5656
): Writer {

0 commit comments

Comments
 (0)