Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,45 @@ Options:
| -------- | ------------------------------- | -------------------------------------------------------------------------------- |
| topics | `DescribeLogDirsRequestTopic[]` | Array of topics specifying the topics and partitions for which to describe logs. |

### `listConsumerGroupOffsets(options[, callback])`

Lists committed offsets for consumer groups.

The return value is an array of groups with information about committed offsets per partition per topic.

Options:

| Property | Type | Description |
| ------------- | --------------------------- | --------------------------------------------------------------------------- |
| groups | `OffsetFetchRequestGroup[]` | Array of groups specifying the groups and topics for which to list offsets. |
| requireStable | `boolean` | Whether to require stable offsets. |

### `alterConsumerGroupOffsets(options[, callback])`

Alters committed offsets for a consumer group. Note, that the consumer group must be empty to succeed.

The return value is `void`.

Options:

| Property | Type | Description |
| -------- | ---------------------------------- | ------------------------------------------------------------ |
| groupId | `string` | The consumer group ID for which to alter offsets. |
| topics | `AlterConsumerGroupOffsetsTopic[]` | Array of topics with partitions and their new offset values. |

### `deleteConsumerGroupOffsets(options[, callback])`

Deletes committed offsets for specific topics of a consumer group.

The return value is an array of topic partitions indicating which offsets were deleted.

Options:

| Property | Type | Description |
| -------- | ------------------- | ----------------------------------------------------------- |
| groupId | `string` | The consumer group ID from which to delete offsets. |
| topics | `TopicPartitions[]` | Array of topics and partitions for which to delete offsets. |

### `close([callback])`

Closes the admin and all its connections.
Expand Down
41 changes: 21 additions & 20 deletions docs/diagnostic.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,24 @@ Each tracing channel publishes events with the following common properties:

## Published tracing channels

| Name | Target | Description |
| ----------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------- |
| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. |
| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. |
| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. |
| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. |
| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. |
| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. |
| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. |
| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. |
| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. |
| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. |
| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. |
| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. |
| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. |
| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. |
| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. |
| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. |
| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. |
| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. |
| Name | Target | Description |
| -------------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------- |
| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. |
| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. |
| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. |
| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. |
| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. |
| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. |
| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. |
| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. |
| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. |
| `plt:kafka:admin:consumerGroupOffsets` | `Admin` | Traces a `Admin.listConsumerGroupOffsets` or `Admin.deleteConsumerGroupOffsets` request. |
| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. |
| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. |
| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. |
| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. |
| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. |
| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. |
| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. |
| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. |
| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. |
6 changes: 1 addition & 5 deletions playground/apis/admin/offset-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ await performAPICallWithRetry('OffsetDelete', () =>
offsetDeleteV0.async(connection, 'g2', [
{
name: 'temp',
partitions: [
{
partitionIndex: 0
}
]
partitions: [0]
}
]))

Expand Down
17 changes: 10 additions & 7 deletions playground/apis/consumer/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { api as fetchV17 } from '../../../src/apis/consumer/fetch-v17.ts'
import { api as offsetCommitV9 } from '../../../src/apis/consumer/offset-commit-v9.ts'
import { api as offsetFetchV9 } from '../../../src/apis/consumer/offset-fetch-v9.ts'
import { api as offsetFetchV9 } from '../../../src/apis/admin/offset-fetch-v9.ts'
import { api as syncGroupV5 } from '../../../src/apis/consumer/sync-group-v5.ts'
import { FetchIsolationLevels, FindCoordinatorKeyTypes } from '../../../src/apis/enumerations.ts'
import { api as findCoordinatorV6 } from '../../../src/apis/metadata/find-coordinator-v6.ts'
Expand Down Expand Up @@ -45,7 +45,7 @@ const offsetFetch = await performAPICallWithRetry('OffsetFetch', () =>
topics: [
{
name: topicName,
partitionIndexes: [0]
partitions: [0]
}
]
}
Expand Down Expand Up @@ -103,11 +103,14 @@ for (let i = 0; i < 3; i++) {
break
}

const { nextOffset, records } = batches.reduce<{ nextOffset: bigint, records: KafkaRecord[] }>((acc, batch) => {
acc.nextOffset = batch.firstOffset + BigInt(batch.records.length)
acc.records.push(...batch.records)
return acc
}, { nextOffset: fetchOffset, records: [] })
const { nextOffset, records } = batches.reduce<{ nextOffset: bigint; records: KafkaRecord[] }>(
(acc, batch) => {
acc.nextOffset = batch.firstOffset + BigInt(batch.records.length)
acc.records.push(...batch.records)
return acc
},
{ nextOffset: fetchOffset, records: [] }
)

fetchOffset = nextOffset
console.log(
Expand Down
2 changes: 2 additions & 0 deletions src/apis/admin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@ export * as offsetDeleteV0 from './offset-delete-v0.ts'
export * as renewDelegationTokenV2 from './renew-delegation-token-v2.ts'
export * as unregisterBrokerV0 from './unregister-broker-v0.ts'
export * as updateFeaturesV1 from './update-features-v1.ts'
export * as offsetFetchV8 from './offset-fetch-v8.ts'
export * as offsetFetchV9 from './offset-fetch-v9.ts'
56 changes: 28 additions & 28 deletions src/apis/admin/offset-delete-v0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,7 @@ import { ResponseError } from '../../errors.ts'
import { type Reader } from '../../protocol/reader.ts'
import { Writer } from '../../protocol/writer.ts'
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'

export interface OffsetDeleteRequestPartition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert these types change.

partitionIndex: number
}

export interface OffsetDeleteRequestTopic {
name: string
partitions: OffsetDeleteRequestPartition[]
}
import { type TopicPartitions } from '../types.ts'

export type OffsetDeleteRequest = Parameters<typeof createRequest>

Expand Down Expand Up @@ -38,13 +30,13 @@ export interface OffsetDeleteResponse {
partitions => partition_index
partition_index => INT32
*/
export function createRequest (groupId: string, topics: OffsetDeleteRequestTopic[]): Writer {
export function createRequest (groupId: string, topics: TopicPartitions[]): Writer {
return Writer.create()
.appendString(groupId, false)
.appendArray(
topics,
(w, t) => {
w.appendString(t.name, false).appendArray(t.partitions, (w, p) => w.appendInt32(p.partitionIndex), false, false)
w.appendString(t.name, false).appendArray(t.partitions, (w, p) => w.appendInt32(p), false, false)
},
false,
false
Expand Down Expand Up @@ -78,23 +70,31 @@ export function parseResponse (
const response: OffsetDeleteResponse = {
errorCode,
throttleTimeMs: reader.readInt32(),
topics: reader.readArray((r, i) => {
return {
name: r.readString(),
partitions: r.readArray((r, j) => {
const partition = {
partitionIndex: r.readInt32(),
errorCode: r.readInt16()
}

if (partition.errorCode !== 0) {
errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode])
}

return partition
})
}
})
topics: reader.readArray(
(r, i) => {
return {
name: r.readString(false),
partitions: r.readArray(
(r, j) => {
const partition = {
partitionIndex: r.readInt32(),
errorCode: r.readInt16()
}

if (partition.errorCode !== 0) {
errors.push([`/topics/${i}/partitions/${j}`, partition.errorCode])
}

return partition
},
false,
false
)
}
},
false,
false
)
}

if (errors.length) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import { ResponseError } from '../../errors.ts'
import { type NullableString } from '../../protocol/definitions.ts'
import { type Nullable, type NullableString } from '../../protocol/definitions.ts'
import { type Reader } from '../../protocol/reader.ts'
import { Writer } from '../../protocol/writer.ts'
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'

export interface OffsetFetchRequestTopic {
name: string
partitionIndexes: number[]
}
import { type TopicPartitions } from '../types.ts'

export interface OffsetFetchRequestGroup {
groupId: string
memberId?: NullableString
memberEpoch: number
topics: OffsetFetchRequestTopic[]
topics?: Nullable<TopicPartitions[]>
}

export type OffsetFetchRequest = Parameters<typeof createRequest>
Expand Down Expand Up @@ -57,7 +53,7 @@ export function createRequest (groups: OffsetFetchRequestGroup[], requireStable:
return Writer.create()
.appendArray(groups, (w, g) => {
w.appendString(g.groupId).appendArray(g.topics, (w, t) => {
w.appendString(t.name).appendArray(t.partitionIndexes, (w, i) => w.appendInt32(i), true, false)
w.appendString(t.name).appendArray(t.partitions, (w, i) => w.appendInt32(i), true, false)
})
})
.appendBoolean(requireStable)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import { ResponseError } from '../../errors.ts'
import { type NullableString } from '../../protocol/definitions.ts'
import { type Nullable, type NullableString } from '../../protocol/definitions.ts'
import { type Reader } from '../../protocol/reader.ts'
import { Writer } from '../../protocol/writer.ts'
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'

export interface OffsetFetchRequestTopic {
name: string
partitionIndexes: number[]
}
import { type TopicPartitions } from '../types.ts'

export interface OffsetFetchRequestGroup {
groupId: string
memberId?: NullableString
memberEpoch: number
topics: OffsetFetchRequestTopic[]
topics?: Nullable<TopicPartitions[]>
}

export type OffsetFetchRequest = Parameters<typeof createRequest>
Expand Down Expand Up @@ -60,7 +56,7 @@ export function createRequest (groups: OffsetFetchRequestGroup[], requireStable:
.appendString(g.memberId)
.appendInt32(g.memberEpoch)
.appendArray(g.topics, (w, t) => {
w.appendString(t.name).appendArray(t.partitionIndexes, (w, i) => w.appendInt32(i), true, false)
w.appendString(t.name).appendArray(t.partitions, (w, i) => w.appendInt32(i), true, false)
})
})
.appendBoolean(requireStable)
Expand Down
2 changes: 0 additions & 2 deletions src/apis/consumer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,5 @@ export * as listOffsetsV8 from './list-offsets-v8.ts'
export * as listOffsetsV9 from './list-offsets-v9.ts'
export * as offsetCommitV8 from './offset-commit-v8.ts'
export * as offsetCommitV9 from './offset-commit-v9.ts'
export * as offsetFetchV8 from './offset-fetch-v8.ts'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert these file movings.

export * as offsetFetchV9 from './offset-fetch-v9.ts'
export * as offsetForLeaderEpochV4 from './offset-for-leader-epoch-v4.ts'
export * as syncGroupV5 from './sync-group-v5.ts'
2 changes: 1 addition & 1 deletion src/apis/consumer/offset-commit-v9.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export interface OffsetCommitResponse {
export function createRequest (
groupId: string,
generationIdOrMemberEpoch: number,
memberId: string,
memberId: NullableString,
groupInstanceId: NullableString,
topics: OffsetCommitRequestTopic[]
): Writer {
Expand Down
1 change: 1 addition & 0 deletions src/apis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
export * from './callbacks.ts'
export * from './definitions.ts'
export * from './enumerations.ts'
export * from './types.ts'

// Low-level APIs
export * from './admin/index.ts'
Expand Down
4 changes: 4 additions & 0 deletions src/apis/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface TopicPartitions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert anything related to this type.

name: string
partitions: number[]
}
Loading