Skip to content

Commit d8b899c

Browse files
authored
feat: Added Azure Event Hub support. (#165)
Signed-off-by: Paolo Insogna <[email protected]>
1 parent 91a985f commit d8b899c

35 files changed

+9775
-24
lines changed
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { ResponseError } from '../../errors.ts'
2+
import { type NullableString } from '../../protocol/definitions.ts'
3+
import { type Reader } from '../../protocol/reader.ts'
4+
import { Writer } from '../../protocol/writer.ts'
5+
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'
6+
7+
export interface CreatePartitionsRequestAssignment {
8+
brokerIds: number[]
9+
}
10+
11+
export interface CreatePartitionsRequestTopic {
12+
name: string
13+
count: number
14+
assignments: CreatePartitionsRequestAssignment[]
15+
}
16+
17+
export type CreatePartitionsRequest = Parameters<typeof createRequest>
18+
19+
export interface CreatePartitionsResponseResult {
20+
name: string
21+
errorCode: number
22+
errorMessage: NullableString
23+
}
24+
25+
export interface CreatePartitionsResponse {
26+
throttleTimeMs: number
27+
results: CreatePartitionsResponseResult[]
28+
}
29+
30+
/*
31+
CreatePartitions Request (Version: 1) => [topics] timeout_ms validate_only TAG_BUFFER
32+
topics => name count [assignments] TAG_BUFFER
33+
name => STRING
34+
count => INT32
35+
assignments => [broker_ids] TAG_BUFFER
36+
broker_ids => INT32
37+
timeout_ms => INT32
38+
validate_only => BOOLEAN
39+
*/
40+
export function createRequest (
41+
topics: CreatePartitionsRequestTopic[],
42+
timeoutMs: number,
43+
validateOnly: boolean
44+
): Writer {
45+
return Writer.create()
46+
.appendArray(
47+
topics,
48+
(w, t) => {
49+
w.appendString(t.name, false)
50+
.appendInt32(t.count)
51+
.appendArray(
52+
t.assignments,
53+
(w, a) => w.appendArray(a.brokerIds, (w, b) => w.appendInt32(b), false, false),
54+
false,
55+
false
56+
)
57+
},
58+
false,
59+
false
60+
)
61+
.appendInt32(timeoutMs)
62+
.appendBoolean(validateOnly)
63+
}
64+
65+
/*
66+
CreatePartitions Response (Version: 1) => throttle_time_ms [results] TAG_BUFFER
67+
throttle_time_ms => INT32
68+
results => name error_code error_message TAG_BUFFER
69+
name => STRING
70+
error_code => INT16
71+
error_message => NULLABLE_STRING
72+
*/
73+
export function parseResponse (
74+
_correlationId: number,
75+
apiKey: number,
76+
apiVersion: number,
77+
reader: Reader
78+
): CreatePartitionsResponse {
79+
const errors: ResponseErrorWithLocation[] = []
80+
81+
const response: CreatePartitionsResponse = {
82+
throttleTimeMs: reader.readInt32(),
83+
results: reader.readArray(
84+
(r, i) => {
85+
const result = {
86+
name: r.readString(false),
87+
errorCode: r.readInt16(),
88+
errorMessage: r.readNullableString(false)
89+
}
90+
91+
if (result.errorCode !== 0) {
92+
errors.push([`/results/${i}`, result.errorCode])
93+
}
94+
95+
return result
96+
},
97+
false,
98+
false
99+
)
100+
}
101+
102+
if (errors.length) {
103+
throw new ResponseError(apiKey, apiVersion, Object.fromEntries(errors), response)
104+
}
105+
106+
return response
107+
}
108+
109+
export const api = createAPI<CreatePartitionsRequest, CreatePartitionsResponse>(
110+
37,
111+
1,
112+
createRequest,
113+
parseResponse,
114+
false,
115+
false
116+
)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import { ResponseError } from '../../errors.ts'
2+
import { type NullableString } from '../../protocol/definitions.ts'
3+
import { type Reader } from '../../protocol/reader.ts'
4+
import { Writer } from '../../protocol/writer.ts'
5+
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'
6+
7+
export interface CreatePartitionsRequestAssignment {
8+
brokerIds: number[]
9+
}
10+
11+
export interface CreatePartitionsRequestTopic {
12+
name: string
13+
count: number
14+
assignments: CreatePartitionsRequestAssignment[]
15+
}
16+
17+
export type CreatePartitionsRequest = Parameters<typeof createRequest>
18+
19+
export interface CreatePartitionsResponseResult {
20+
name: string
21+
errorCode: number
22+
errorMessage: NullableString
23+
}
24+
25+
export interface CreatePartitionsResponse {
26+
throttleTimeMs: number
27+
results: CreatePartitionsResponseResult[]
28+
}
29+
30+
/*
31+
CreatePartitions Request (Version: 2) => [topics] timeout_ms validate_only TAG_BUFFER
32+
topics => name count [assignments] TAG_BUFFER
33+
name => COMPACT_STRING
34+
count => INT32
35+
assignments => [broker_ids] TAG_BUFFER
36+
broker_ids => INT32
37+
timeout_ms => INT32
38+
validate_only => BOOLEAN
39+
*/
40+
export function createRequest (
41+
topics: CreatePartitionsRequestTopic[],
42+
timeoutMs: number,
43+
validateOnly: boolean
44+
): Writer {
45+
return Writer.create()
46+
.appendArray(topics, (w, t) => {
47+
w.appendString(t.name)
48+
.appendInt32(t.count)
49+
.appendArray(t.assignments, (w, a) => w.appendArray(a.brokerIds, (w, b) => w.appendInt32(b), true, false))
50+
})
51+
.appendInt32(timeoutMs)
52+
.appendBoolean(validateOnly)
53+
.appendTaggedFields()
54+
}
55+
56+
/*
57+
CreatePartitions Response (Version: 2) => throttle_time_ms [results] TAG_BUFFER
58+
throttle_time_ms => INT32
59+
results => name error_code error_message TAG_BUFFER
60+
name => COMPACT_STRING
61+
error_code => INT16
62+
error_message => COMPACT_NULLABLE_STRING
63+
*/
64+
export function parseResponse (
65+
_correlationId: number,
66+
apiKey: number,
67+
apiVersion: number,
68+
reader: Reader
69+
): CreatePartitionsResponse {
70+
const errors: ResponseErrorWithLocation[] = []
71+
72+
const response: CreatePartitionsResponse = {
73+
throttleTimeMs: reader.readInt32(),
74+
results: reader.readArray((r, i) => {
75+
const result = {
76+
name: r.readString(),
77+
errorCode: r.readInt16(),
78+
errorMessage: r.readNullableString()
79+
}
80+
81+
if (result.errorCode !== 0) {
82+
errors.push([`/results/${i}`, result.errorCode])
83+
}
84+
85+
return result
86+
})
87+
}
88+
89+
if (errors.length) {
90+
throw new ResponseError(apiKey, apiVersion, Object.fromEntries(errors), response)
91+
}
92+
93+
return response
94+
}
95+
96+
export const api = createAPI<CreatePartitionsRequest, CreatePartitionsResponse>(37, 2, createRequest, parseResponse)
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import { ResponseError } from '../../errors.ts'
2+
import { type NullableString } from '../../protocol/definitions.ts'
3+
import { type Reader } from '../../protocol/reader.ts'
4+
import { Writer } from '../../protocol/writer.ts'
5+
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'
6+
7+
export interface DescribeConfigsRequestResource {
8+
resourceType: number
9+
resourceName: string
10+
configurationKeys: string[]
11+
}
12+
13+
export type DescribeConfigsRequest = Parameters<typeof createRequest>
14+
15+
export interface DescribeConfigsResponseSynonym {
16+
name: string
17+
value: NullableString
18+
source: number
19+
}
20+
21+
export interface DescribeConfigsResponseConfig {
22+
name: string
23+
value: NullableString
24+
readOnly: boolean
25+
configSource: number
26+
isSensitive: boolean
27+
synonyms: DescribeConfigsResponseSynonym[]
28+
configType: number
29+
documentation: NullableString
30+
}
31+
32+
export interface DescribeConfigsResponseResult {
33+
errorCode: number
34+
errorMessage: NullableString
35+
resourceType: number
36+
resourceName: string
37+
configs: DescribeConfigsResponseConfig[]
38+
}
39+
40+
export interface DescribeConfigsResponse {
41+
throttleTimeMs: number
42+
results: DescribeConfigsResponseResult[]
43+
}
44+
45+
/*
46+
DescribeConfigs Request (Version: 2) => [resources] include_synonyms include_documentation TAG_BUFFER
47+
resources => resource_type resource_name [configuration_keys] TAG_BUFFER
48+
resource_type => INT8
49+
resource_name => STRING
50+
configuration_keys => STRING
51+
include_synonyms => BOOLEAN
52+
include_documentation => BOOLEAN
53+
*/
54+
export function createRequest (
55+
resources: DescribeConfigsRequestResource[],
56+
includeSynonyms: boolean,
57+
includeDocumentation: boolean
58+
): Writer {
59+
return Writer.create()
60+
.appendArray(
61+
resources,
62+
(w, r) => {
63+
w.appendInt8(r.resourceType)
64+
.appendString(r.resourceName, false)
65+
.appendArray(r.configurationKeys, (w, c) => w.appendString(c, false), false, false)
66+
},
67+
false,
68+
false
69+
)
70+
.appendBoolean(includeSynonyms)
71+
.appendBoolean(includeDocumentation)
72+
}
73+
74+
/*
75+
DescribeConfigs Response (Version: 2) => throttle_time_ms [results] TAG_BUFFER
76+
throttle_time_ms => INT32
77+
results => error_code error_message resource_type resource_name [configs] TAG_BUFFER
78+
error_code => INT16
79+
error_message => NULLABLE_STRING
80+
resource_type => INT8
81+
resource_name => STRING
82+
configs => name value read_only config_source is_sensitive [synonyms] config_type documentation TAG_BUFFER
83+
name => STRING
84+
value => NULLABLE_STRING
85+
read_only => BOOLEAN
86+
config_source => INT8
87+
is_sensitive => BOOLEAN
88+
synonyms => name value source TAG_BUFFER
89+
name => STRING
90+
value => NULLABLE_STRING
91+
source => INT8
92+
config_type => INT8
93+
documentation => NULLABLE_STRING
94+
*/
95+
export function parseResponse (
96+
_correlationId: number,
97+
apiKey: number,
98+
apiVersion: number,
99+
reader: Reader
100+
): DescribeConfigsResponse {
101+
const errors: ResponseErrorWithLocation[] = []
102+
103+
const response: DescribeConfigsResponse = {
104+
throttleTimeMs: reader.readInt32(),
105+
results: reader.readArray(
106+
(r, i) => {
107+
const errorCode = r.readInt16()
108+
109+
if (errorCode !== 0) {
110+
errors.push([`/results/${i}`, errorCode])
111+
}
112+
113+
return {
114+
errorCode,
115+
errorMessage: r.readNullableString(false),
116+
resourceType: r.readInt8(),
117+
resourceName: r.readString(false),
118+
configs: r.readArray(
119+
r => {
120+
return {
121+
name: r.readString(false),
122+
value: r.readNullableString(false),
123+
readOnly: r.readBoolean(),
124+
configSource: r.readInt8(),
125+
isSensitive: r.readBoolean(),
126+
synonyms: r.readArray(
127+
r => {
128+
return {
129+
name: r.readString(false),
130+
value: r.readNullableString(false),
131+
source: r.readInt8()
132+
}
133+
},
134+
false,
135+
false
136+
),
137+
configType: r.readInt8(),
138+
documentation: r.readNullableString(false)
139+
}
140+
},
141+
false,
142+
false
143+
)
144+
}
145+
},
146+
false,
147+
false
148+
)
149+
}
150+
151+
if (errors.length) {
152+
throw new ResponseError(apiKey, apiVersion, Object.fromEntries(errors), response)
153+
}
154+
155+
return response
156+
}
157+
158+
export const api = createAPI<DescribeConfigsRequest, DescribeConfigsResponse>(
159+
32,
160+
2,
161+
createRequest,
162+
parseResponse,
163+
false,
164+
false
165+
)

0 commit comments

Comments
 (0)