Skip to content

Commit 8f35fa4

Browse files
committed
(chore): migrate to using confluent's kafka library
1 parent eaa26e8 commit 8f35fa4

15 files changed

+45
-45
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
# kafka-wrapper
2-
A simple kafka wrapper for `node-rdkafka` client.
2+
A simple kafka wrapper for `@confluentinc/kafka-javascript` client.

lib/admin.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/// <reference types="node" />
22
import Client from './client';
3-
import { GlobalConfig } from 'node-rdkafka';
3+
import { GlobalConfig } from '@confluentinc/kafka-javascript';
44
import EventEmitter from 'events';
55
declare class KafkaAdmin extends Client {
66
private adminClient;

lib/admin.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/client.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/// <reference types="node" />
22
import EventEmitter from "events";
3-
import { GlobalConfig, LibrdKafkaError, TopicConfig } from "node-rdkafka";
3+
import { GlobalConfig, LibrdKafkaError, TopicConfig } from "@confluentinc/kafka-javascript";
44
export default class Client {
55
private clientId;
66
private clientType;

lib/consumer.d.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// <reference types="node" />
2-
import { ConsumerGlobalConfig, ConsumerTopicConfig, LibrdKafkaError, Message, SubscribeTopicList } from 'node-rdkafka';
2+
import { ConsumerGlobalConfig, ConsumerTopicConfig, LibrdKafkaError, Message, SubscribeTopicList } from '@confluentinc/kafka-javascript';
33
import { EventEmitter } from 'stream';
44
import Client from './client';
55
export declare type ConsumeActionFunction = (err: LibrdKafkaError, messages: Message[]) => void;
@@ -10,8 +10,8 @@ declare class KafkaConsumer extends Client {
1010
* Initializes a KafkaConsumer.
1111
* @param {String} clientId: id to identify a client consuming the message.
1212
* @param {String} groupId: consumer group id, the consumer belongs to.
13-
* @param {import('node-rdkafka').ConsumerGlobalConfig} config: configs for consumer.
14-
* @param {import('node-rdkafka').ConsumerTopicConfig} topicConfig: topic configs
13+
* @param {import('@confluentinc/kafka-javascript').ConsumerGlobalConfig} config: configs for consumer.
14+
* @param {import('@confluentinc/kafka-javascript').ConsumerTopicConfig} topicConfig: topic configs
1515
* @param {EventEmitter} emitter: to emit log events
1616
*/
1717
constructor(clientId: string, groupId: string, config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, emitter: EventEmitter);
@@ -24,7 +24,7 @@ declare class KafkaConsumer extends Client {
2424
connect(): Promise<this | LibrdKafkaError>;
2525
/**
2626
* Subscribe to topics.
27-
* @param {import('node-rdkafka').SubscribeTopicList} topics: array of topic names.
27+
* @param {import('@confluentinc/kafka-javascript').SubscribeTopicList} topics: array of topic names.
2828
* @returns {KafkaConsumer}
2929
*/
3030
subscribe(topics: SubscribeTopicList): this;
@@ -62,7 +62,7 @@ declare class KafkaConsumer extends Client {
6262
_wrapListenCallbackWrapper(actionOnData: any): (msg: any) => void;
6363
/**
6464
* Parses message before passing it to consumer callback.
65-
* @param {Object} msg - expects it to be in node-rdkafka msg format.
65+
* @param {Object} msg - expects it to be in @confluentinc/kafka-javascript msg format.
6666
* @returns
6767
*/
6868
_parseMessage(msg: any): any;

lib/consumer.js

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/index.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import getKafkaProducer, { ProduceParameters } from './producer';
22
import getKafkaConsumer, { ConsumeActionFunction, ListenActionFunction } from './consumer';
33
import KafkaAdmin from './admin';
4-
import { ClientMetrics, LibrdKafkaError, NumberNullUndefined, SubscribeTopicList } from 'node-rdkafka';
4+
import { ClientMetrics, LibrdKafkaError, NumberNullUndefined, SubscribeTopicList } from '@confluentinc/kafka-javascript';
55
import { ErrorHandlingFunction } from './client';
66
interface KafkaConsumer {
77
connect(): Promise<KafkaConsumer | LibrdKafkaError>;

lib/producer.d.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/// <reference types="node" />
22
import EventEmitter from 'events';
3-
import { ClientMetrics, LibrdKafkaError, MessageKey, NumberNullUndefined, ProducerGlobalConfig, ProducerTopicConfig } from 'node-rdkafka';
3+
import { ClientMetrics, LibrdKafkaError, MessageKey, NumberNullUndefined, ProducerGlobalConfig, ProducerTopicConfig } from '@confluentinc/kafka-javascript';
44
import Client, { ErrorHandlingFunction } from './client';
55
export interface ProduceParameters {
66
topic: string;
@@ -14,8 +14,8 @@ declare class KafkaProducer extends Client {
1414
/**
1515
* Initializes a KafkaProducer.
1616
* @param {String} clientId: id to identify a client producing the message.
17-
* @param {import('node-rdkafka').ProducerGlobalConfig} config: configs for producer.
18-
* @param {import('node-rdkafka').ProducerTopicConfig} topicConfig: topic configs.
17+
* @param {import('@confluentinc/kafka-javascript').ProducerGlobalConfig} config: configs for producer.
18+
* @param {import('@confluentinc/kafka-javascript').ProducerTopicConfig} topicConfig: topic configs.
1919
* @param {EventEmitter} emitter: to emit log messages
2020
*/
2121
constructor(clientId: string, config: ProducerGlobalConfig, topicConfig: ProducerTopicConfig, emitter: EventEmitter);
@@ -29,17 +29,17 @@ declare class KafkaProducer extends Client {
2929
/**
3030
* Produce a message to a topic-partition.
3131
* @param {String} topic: name of topic
32-
* @param {import('node-rdkafka').NumberNullUndefined} partition: partition number to produce to.
32+
* @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} partition: partition number to produce to.
3333
* @param {any} message: message to be produced.
34-
* @param {import('node-rdkafka').MessageKey} key: key associated with the message.
35-
* @param {import('node-rdkafka').NumberNullUndefined} timestamp: timestamp to send with the message.
34+
* @param {import('@confluentinc/kafka-javascript').MessageKey} key: key associated with the message.
35+
* @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} timestamp: timestamp to send with the message.
3636
* @returns {import('../types').BooleanOrNumber}: returns boolean or librdkafka error code.
3737
*/
3838
produce({ topic, message, partition, key, timestamp }: ProduceParameters): boolean | number;
3939
/**
4040
* Flush everything on the internal librdkafka buffer.
4141
* Good to perform before disconnect.
42-
* @param {import('node-rdkafka').NumberNullUndefined}} timeout
42+
* @param {import('@confluentinc/kafka-javascript').NumberNullUndefined}} timeout
4343
* @param {import('../types').ErrorHandlingFunction} postFlushAction
4444
* @returns {KafkaProducer}
4545
*/

lib/producer.js

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
],
1919
"license": "ISC",
2020
"dependencies": {
21-
"node-rdkafka": "^2.18.0"
21+
"@confluentinc/kafka-javascript": "^1.3.1"
2222
},
2323
"devDependencies": {
2424
"@types/node": "17.0.21",

src/admin.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import Client from './client';
2-
import { AdminClient, GlobalConfig } from 'node-rdkafka';
2+
import { AdminClient, GlobalConfig } from '@confluentinc/kafka-javascript';
33
import EventEmitter from 'events';
44

55
class KafkaAdmin extends Client {

src/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import EventEmitter from "events";
2-
import { GlobalConfig, LibrdKafkaError, TopicConfig } from "node-rdkafka";
2+
import { GlobalConfig, LibrdKafkaError, TopicConfig } from "@confluentinc/kafka-javascript";
33

44
export default class Client {
55
constructor(private clientId: string,

src/consumer.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import Kafka, { ConsumerGlobalConfig, ConsumerTopicConfig, LibrdKafkaError, Message, SubscribeTopicList } from 'node-rdkafka';
1+
import Kafka, { ConsumerGlobalConfig, ConsumerTopicConfig, LibrdKafkaError, Message, SubscribeTopicList } from '@confluentinc/kafka-javascript';
22
import { EventEmitter } from 'stream';
33
import Client from './client';
44

@@ -15,8 +15,8 @@ class KafkaConsumer extends Client {
1515
* Initializes a KafkaConsumer.
1616
* @param {String} clientId: id to identify a client consuming the message.
1717
* @param {String} groupId: consumer group id, the consumer belongs to.
18-
* @param {import('node-rdkafka').ConsumerGlobalConfig} config: configs for consumer.
19-
* @param {import('node-rdkafka').ConsumerTopicConfig} topicConfig: topic configs
18+
* @param {import('@confluentinc/kafka-javascript').ConsumerGlobalConfig} config: configs for consumer.
19+
* @param {import('@confluentinc/kafka-javascript').ConsumerTopicConfig} topicConfig: topic configs
2020
* @param {EventEmitter} emitter: to emit log events
2121
*/
2222
constructor(clientId: string, groupId: string, config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, emitter: EventEmitter) {
@@ -80,7 +80,7 @@ class KafkaConsumer extends Client {
8080

8181
/**
8282
* Subscribe to topics.
83-
* @param {import('node-rdkafka').SubscribeTopicList} topics: array of topic names.
83+
* @param {import('@confluentinc/kafka-javascript').SubscribeTopicList} topics: array of topic names.
8484
* @returns {KafkaConsumer}
8585
*/
8686
subscribe(topics: SubscribeTopicList): this {
@@ -185,7 +185,7 @@ class KafkaConsumer extends Client {
185185

186186
/**
187187
* Parses message before passing it to consumer callback.
188-
* @param {Object} msg - expects it to be in node-rdkafka msg format.
188+
* @param {Object} msg - expects it to be in @confluentinc/kafka-javascript msg format.
189189
* @returns
190190
*/
191191
_parseMessage(msg) {

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import getKafkaProducer, { ProduceParameters } from './producer'
22
import getKafkaConsumer, { ConsumeActionFunction, ListenActionFunction } from './consumer';
33
import KafkaAdmin from './admin';
4-
import { ClientMetrics, LibrdKafkaError, NumberNullUndefined, SubscribeTopicList } from 'node-rdkafka';
4+
import { ClientMetrics, LibrdKafkaError, NumberNullUndefined, SubscribeTopicList } from '@confluentinc/kafka-javascript';
55
import { ErrorHandlingFunction } from './client';
66

77
interface KafkaConsumer {

src/producer.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import EventEmitter from 'events';
2-
import Kafka, { ClientMetrics, LibrdKafkaError, MessageKey, NumberNullUndefined, ProducerGlobalConfig, ProducerTopicConfig } from 'node-rdkafka';
2+
import Kafka, { ClientMetrics, LibrdKafkaError, MessageKey, NumberNullUndefined, ProducerGlobalConfig, ProducerTopicConfig } from '@confluentinc/kafka-javascript';
33
import Client, { ErrorHandlingFunction } from './client';
44

55
export interface ProduceParameters{
@@ -18,8 +18,8 @@ class KafkaProducer extends Client {
1818
/**
1919
* Initializes a KafkaProducer.
2020
* @param {String} clientId: id to identify a client producing the message.
21-
* @param {import('node-rdkafka').ProducerGlobalConfig} config: configs for producer.
22-
* @param {import('node-rdkafka').ProducerTopicConfig} topicConfig: topic configs.
21+
* @param {import('@confluentinc/kafka-javascript').ProducerGlobalConfig} config: configs for producer.
22+
* @param {import('@confluentinc/kafka-javascript').ProducerTopicConfig} topicConfig: topic configs.
2323
* @param {EventEmitter} emitter: to emit log messages
2424
*/
2525
constructor(clientId: string, config: ProducerGlobalConfig, topicConfig: ProducerTopicConfig, emitter: EventEmitter) {
@@ -85,10 +85,10 @@ class KafkaProducer extends Client {
8585
/**
8686
* Produce a message to a topic-partition.
8787
* @param {String} topic: name of topic
88-
* @param {import('node-rdkafka').NumberNullUndefined} partition: partition number to produce to.
88+
* @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} partition: partition number to produce to.
8989
* @param {any} message: message to be produced.
90-
* @param {import('node-rdkafka').MessageKey} key: key associated with the message.
91-
* @param {import('node-rdkafka').NumberNullUndefined} timestamp: timestamp to send with the message.
90+
* @param {import('@confluentinc/kafka-javascript').MessageKey} key: key associated with the message.
91+
* @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} timestamp: timestamp to send with the message.
9292
* @returns {import('../types').BooleanOrNumber}: returns boolean or librdkafka error code.
9393
*/
9494
produce({ topic, message, partition = null, key = null, timestamp = null }: ProduceParameters): boolean | number {
@@ -105,7 +105,7 @@ class KafkaProducer extends Client {
105105
/**
106106
* Flush everything on the internal librdkafka buffer.
107107
* Good to perform before disconnect.
108-
* @param {import('node-rdkafka').NumberNullUndefined}} timeout
108+
* @param {import('@confluentinc/kafka-javascript').NumberNullUndefined}} timeout
109109
* @param {import('../types').ErrorHandlingFunction} postFlushAction
110110
* @returns {KafkaProducer}
111111
*/

0 commit comments

Comments
 (0)