Skip to content

Incompatibility between KafkaJS and Confluent "round robin" partition assigment #208

@apeloquin-agilysys

Description

@apeloquin-agilysys

Environment Information

  • OS: Mac M3 Sonoma 14.6.1
  • Node Version: 20.14.0
  • NPM Version: 10.7.0
  • confluent-kafka-javascript version: 0.5.2

Steps to Reproduce

Here's the scenario we're trying solve:

  • Existing production application using KafkaJS
  • Need a zero-downtime transition to a new version of same application using Confluent library
  • To provide continuity and avoid message loss or repeat, the same consumer group IDs are used in both versions of the app
  • Typical Kubernetes rollout, pods with new version are brought online, and then pods with older version are shutdown

The problem:

Consumer encountered error while consuming. Retrying. Error details: KafkaJSProtocolError: Broker: Inconsistent group protocol : Error: Broker: Inconsistent group protocol

Although both libraries default to "round robin", they are clearly different protocol implementations. Looking at the groups on the server I see:

Library partitionAssignor
KafkaJS RoundRobinAssigner
Confluent roundrobin

I'm not sure if this will be considered a bug, I realize this may fall under the category of "as expected" or "unavoidable". If that's the case, then my ask would be to help us determine a zero-downtime rollout strategy for the transition. Today we can only bring up the new (Confluent) version of the app in an environment successfully if we shut down all instances of the old (KafkaJS) version first. That will not be a viable option for us in a production environment.

I've included (perhaps unnecessarily) a few tests the highlight the incompatibility.

import {KafkaJS as Confluent, RdKafka} from "@confluentinc/kafka-javascript";
import {Consumer, ConsumerConfig, Kafka, PartitionAssigner, PartitionAssigners} from "kafkajs";

const TOPIC = "test-confluent-topic";
const GROUP_ID = "test-confluent-group";

describe("Partition assigners", () => {
  let kafkaJSKafka: Kafka;
  let kafkaJSConsumer: Consumer;

  let confluentKafka: Confluent.Kafka;
  let confluentAdmin: Confluent.Admin;
  let confluentConsumer: Confluent.Consumer;

  before(async () => {
    kafkaJSKafka = new Kafka({brokers: ["localhost:9092"]});

    confluentKafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});

    confluentAdmin = confluentKafka.admin();
    await confluentAdmin.connect();
    await confluentAdmin.createTopics({topics: [{topic: TOPIC}]});
    await confluentAdmin.disconnect();
  });

  beforeEach(async () => {});

  afterEach(async () => {
    await kafkaJSConsumer?.disconnect();
    await confluentConsumer?.disconnect();
  });

  it("KafkaJS and Confluent both with default (KafkaJS first)", async () => {
    await setupKafkaJSConsumer();
    await setupConfluentConsumer();
  });

  it("KafkaJS and Confluent both with default (Confluent first)", async () => {
    await setupConfluentConsumer();
    await setupKafkaJSConsumer();
  });

  it("KafkaJS and Confluent both explicitly set to round robin", async () => {
    await setupKafkaJSConsumer(PartitionAssigners.roundRobin);
    await setupConfluentConsumer(Confluent.PartitionAssigners.roundRobin);
  });

  async function setupKafkaJSConsumer(...assigners: PartitionAssigner[]) {
    let ready = false;
    const config: ConsumerConfig = {groupId: GROUP_ID};
    if (assigners.length) config.partitionAssigners = assigners;
    kafkaJSConsumer = kafkaJSKafka.consumer(config);
    kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN, (event: any) => {
      setImmediate(async () => (ready = true));
    });
    await kafkaJSConsumer.connect();
    await kafkaJSConsumer.subscribe({topic: TOPIC});
    await kafkaJSConsumer.run({
      eachMessage: async () => {}
    });
    await until(() => ready);
  }

  async function setupConfluentConsumer(...assigners: Confluent.PartitionAssigners[]) {
    let ready = false;
    const config: Confluent.ConsumerConstructorConfig = {
      kafkaJS: {groupId: GROUP_ID, fromBeginning: true},
      rebalance_cb: (err: any, assignment: any, consumer: any) => {
        if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
        if (!ready) ready = true;
      }
    };
    if (assigners.length) config.kafkaJS!.partitionAssigners = assigners;
    confluentConsumer = confluentKafka.consumer(config);
    await confluentConsumer.connect();
    await confluentConsumer.subscribe({topic: TOPIC});
    await confluentConsumer.run({
      eachMessage: async () => {}
    });
    await until(() => ready);
  }

  async function until(condition: () => boolean) {
    const timeout = 10000;
    const finish = Date.now() + timeout;
    while (Date.now() <= finish) {
      const result = condition();
      if (result) return;
      await new Promise(resolve => setTimeout(resolve, 50));
    }
    throw new Error(`Failed within ${timeout!}ms`);
  }
});

Resulting Confluent error:

{
  message: 'Consumer encountered error while consuming. Retrying. Error details: KafkaJSProtocolError: Broker: Inconsistent group protocol : Error: Broker: Inconsistent group protocol\n' +
    '    at Function.createLibrdkafkaError [as create] (/xxxxx/node_modules/@confluentinc/kafka-javascript/lib/error.js:456:10)\n' +
    '    at /xxxxx/node_modules/@confluentinc/kafka-javascript/lib/kafka-consumer.js:536:29\n' +
    '    at callbackTrampoline (node:internal/async_hooks:130:17)',
  name: 'rdkafka#consumer-2',
  fac: 'BINDING',
  timestamp: 1734045157428
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionA question is asked

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions