Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.16.0 - Lossing data when one node of the cluster is lost #1303

Open
cjimenezsaiz opened this issue Feb 28, 2022 · 15 comments
Open

v1.16.0 - Lossing data when one node of the cluster is lost #1303

cjimenezsaiz opened this issue Feb 28, 2022 · 15 comments

Comments

@cjimenezsaiz
Copy link

Describe the bug
I open this issue to clarify if what I see is correct and must be solve by the user in some way, or is a bug.
The specific configuration that i have tested:

  • 3 nodes of Kafka v3.0 based on confluentinc/cp-kafka docker images. All of them with this configuration:
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
  • A single producer that try to write 60 records every 5 seconds with send method. The configuration of this producer is:
    • Client:
    {
      clientId: 'creator',
      brokers: ['localhost:9092', 'localhost:9093', 'localhost:9093'],
      enforceRequestTimeout: true,
      requestTimeout: 3000,
      connectionTimeout: 3000,
    }
    • Producer:
    {
      maxInFlightRequests: 1,
      transactionalId: 'my-transactional-producer',
      idempotent: true,
      transactionTimeout: 3000,
      retry: { retries: Number.MAX_SAFE_INTEGER },
    }

I log every job (the result of a send process) and in the normal operation I can see something like:

The size of the test is: 60
{"level":"ERROR","timestamp":"2022-02-28T09:28:59.133Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"localhost:9092","clientId":"creator","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":2,"size":123}
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "0",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 1,
    "errorCode": 0,
    "baseOffset": "0",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "0",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 1 - 60/600
dbsave: 1.090s
The size of the test is: 60
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "20",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 1,
    "errorCode": 0,
    "baseOffset": "20",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "20",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 2 - 120/600
dbsave: 25.286ms

In the middle of the test I stop one of the servers (not the controller of the cluster) with docker stop 85fsa..., and in the most of cases, not always, this is the log that i receive:

The size of the test is: 60
{"level":"ERROR","timestamp":"2022-02-28T09:29:09.132Z","logger":"kafkajs","message":"[Producer] Failed to send messages: Closed connection","retryCount":0,"retryTime":243}
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "40",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "40",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 3 - 180/600
dbsave: 1.262s
The size of the test is: 60
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "60",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 1,
    "errorCode": 0,
    "baseOffset": "40",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "60",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 4 - 240/600
dbsave: 9.988ms

As you can see the send to one of the servers failed, but the send method resolved with the response of only 2 server, and the send its not retried.

In the last job process you can see that the final result is that we have lost 20 messages without warning from the library:

The size of the test is: 60
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "180",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 1,
    "errorCode": 0,
    "baseOffset": "160",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "180",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 10 - 600/600
dbsave: 7.462ms

The snipped of code that I use to reproduce this behaviour is:

import { Kafka, Message } from 'kafkajs';
import fs from 'fs';

var access = fs.createWriteStream('test.log');
//@ts-ignore ignore is a test
process.stdout.write = process.stderr.write = access.write.bind(access);

function createMessages(from: number, to: number): Message[] {
  const messages: Message[] = [];
  for (let index = from; index < to; index += 1) {
    messages.push({ value: JSON.stringify({ index }) });
  }
  return messages;
}
const client = new Kafka({
  clientId: 'creator',
  brokers: ['localhost:9092', 'localhost:9093', 'localhost:9093'],
  enforceRequestTimeout: true,
  requestTimeout: 3000,
  connectionTimeout: 3000,
});
const producer = client.producer({
  maxInFlightRequests: 1,
  transactionalId: 'my-transactional-producer',
  idempotent: true,
  transactionTimeout: 3000,
  retry: { retries: Number.MAX_SAFE_INTEGER },
});
const consumer = client.consumer({ groupId: 'myGroup' });

(async () => {
  await producer.connect();
  await consumer.connect();
})();

let pending = false;
let consuming = false;
let count = 0;
let aggregatedValue = 0;
let messagesCount = 0;
const messageBatchSize = 60;
const messageFinalAmount = 60 * 10;
const expectedResult = (messageFinalAmount * (messageFinalAmount - 1)) / 2;

console.log(`The final result should be ${expectedResult}`);

setInterval(async () => {
  if (count === messageFinalAmount / messageBatchSize && !consuming) {
    await consumer.subscribe({ topic: 'myTopic', fromBeginning: true });
    await consumer.run({
      eachMessage: async ({ message }) => {
        if (!message.value) {
          throw new Error(`TEST IS BROKEN`);
        }
        const value = JSON.parse(message.value.toString()).index;
        aggregatedValue = aggregatedValue + value;
        console.log(
          `${value} - ${aggregatedValue} - ${
            aggregatedValue === expectedResult ? 'ALL MESSAGE HAS BEEN RECEIVED' : 'PENDING ...'
          }`
        );
      },
    });
    consuming = true;
    return;
  }
  if (pending || consuming) {
    return;
  }
  const messages = createMessages(messagesCount, messagesCount + messageBatchSize);
  console.log(`The size of the test is: ${messages.length}`);
  console.time('dbsave');
  pending = true;
  try {
    const result = await producer.send({ topic: 'myTopic', acks: -1, messages, timeout: 3000 });
    console.log(JSON.stringify(result, null, 2));
    count++;
    messagesCount += messageBatchSize;
    console.log(`OPERATION ${count} - ${messagesCount}/${messageFinalAmount}`);
    console.timeEnd('dbsave');
    pending = false;
  } catch (error) {
    console.log(`ERROR IN OPERATION ${error.message}`);
    console.timeEnd('dbsave');
    pending = false;
  }
}, 5000);

And the docker-compose file:

version: '3.8'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
  kafka-1:
    image: confluentinc/cp-kafka:latest
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
  kafka-2:
    image: confluentinc/cp-kafka:latest
    ports:
      - '9093:9093'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
  kafka-3:
    image: confluentinc/cp-kafka:latest
    ports:
      - '9094:9094'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
  kowl:
    image: quay.io/cloudhut/kowl:master
    depends_on:
      - zookeeper-1
      - kafka-1
      - kafka-2
      - kafka-3
    environment:
      KAFKA_BROKERS: kafka-1:29092,kafka-2:29093, kafka-3:29094
    ports:
      - "8080:8080"

Expected behavior
I expect the method rejects with an error in order to retry the operations by my shelve or try to find the server that its the responsible of the partition now.

Observed behavior
Even one of the servers fails in the middle of the process of sending new records to the cluster, the send method resolves without error and the data is lost.

Environment:

  • OS: Linux Ubuntu 20.04
  • KafkaJS version: 1.16.0 and 1.15.0
  • Kafka version: 3.0
  • NodeJS version: 14.18.2
@cjimenezsaiz
Copy link
Author

cjimenezsaiz commented Mar 5, 2022

Hi there,

I've debugged a little bit more the issue, I don't know the code enough well, but I think I have found where is the problem.

In the sendMessages file, the responses of the brokers is tracked by the Map responsePerBroker.

/** @type {Map<import("../../types").Broker, any[]>} */
const responsePerBroker = new Map()
/** @param {Map<import("../../types").Broker, any[]>} responsePerBroker */
const createProducerRequests = async responsePerBroker => {
const topicMetadata = new Map()

If the request is fulfilled, the response is setted, but if there is a failure in the response, the response from the broker is deleted.

const formattedResponse = expectResponse ? responseSerializer(response) : []
responsePerBroker.set(broker, formattedResponse)
} catch (e) {
responsePerBroker.delete(broker)
throw e

This logic is okey when you have a problem in the request but the broker is still there. For that cases where the broker is lost (from 3 brokers in a cluster to 2), in the next retry of the createProducerRequests no request will be performed, due too right now we have only 2 brokers and both of them has an answer in the map responsePerBroker (brokersWithoutResponse will be empty).

const brokers = Array.from(responsePerBroker.keys())
const brokersWithoutResponse = brokers.filter(broker => !responsePerBroker.get(broker))
return brokersWithoutResponse.map(async broker => {
const entries = Array.from(topicMetadata.entries())

So, the Promise.all() will be resolved without any code execution and the sendMessages function will return the result of the previous iteration.

try {
const requests = await createProducerRequests(responsePerBroker)
await Promise.all(requests)
const responses = Array.from(responsePerBroker.values())
return flatten(responses)

I think this logic should be based in partitiions instead of brokers.

Let me know your thoughts

@Nevon Nevon added the bug label Mar 9, 2022
@Nevon
Copy link
Collaborator

Nevon commented Mar 9, 2022

This is clearly a bug.

I think this logic should be based in partitiions instead of brokers.

So if your analysis is correct, then I suppose what's needed is to during retries detect that we still have outstanding partitions to produce to, check who the new leader for that partition is and then issue new requests.

I'm not sure about the cluster behavior during this operation though. If it's a graceful shutdown of a broker in the cluster, I would expect leadership to change to one of the other brokers, in which case the above approach should work, but if it's an ungraceful shutdown I'm not sure that this will make a difference because the automatic leadership rebalance only kicks in after 5 minutes by default. Regardless, at least we should be able to throw a meaningful error in the case we can't find a leader for the partition, rather than silently fail as is the case currently.

@cjimenezsaiz
Copy link
Author

Hi @Nevon,

As you say there will be a complete different behaviour depending on the server "shutdown style".
A possible solution is to change the resolve response of the send function adding a list the messages that were commited and not, in that way the user can retry to send the uncommited messages by himself. In the second try, if the cluster is able to receive the messages (even with one server less), you will be able to send all the messages or receive a new error with a clear message of the actual problem.

This is the way that some SDKs use to solve this kind of issues.

For example this is the response of AWS SDK for Kinesis Client or Firehose Client:

{
    "FailedRecordCount": 2,
    "Records": [
        {
            "SequenceNumber": "49543463076548007577105092703039560359975228518395012686", 
            "ShardId": "shardId-000000000000"
        }, 
        {
            "ErrorCode": "ProvisionedThroughputExceededException",
            "ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
        },
        {
            "ErrorCode": "InternalFailure",
            "ErrorMessage": "Internal service failure."
        }
    ]
}

Of course we must the select the correct format to be sure that we don't broke any previous implementation.

@Nevon
Copy link
Collaborator

Nevon commented Mar 9, 2022

I think if we have any failures that we couldn't handle, we should reject producer.send. We have similar situations with other operations that do multiple things towards multiple brokers, where the operation can partially fail. For example, admin.createTopics can succeed for one topic and fail for another. #1104 is a good example of how we handle that. I think we could do something similar for this.

@cjimenezsaiz
Copy link
Author

If the producer.send is rejected, the user will try to retry the complete "job", so in the most of the cases some messages will be commited twice, something that it's not the ideal situation.

@Nevon
Copy link
Collaborator

Nevon commented Mar 9, 2022

Check it out, this issue has come up before #43. Let's use that instead.

@Nevon Nevon closed this as completed Mar 9, 2022
@Nevon Nevon added duplicate and removed duplicate labels Mar 9, 2022
@Nevon
Copy link
Collaborator

Nevon commented Mar 9, 2022

Actually, nevermind, this deserves its own issue. Part of it is to communicate the partial failure better to the user, but another part is handling the error to begin with. Realized this one millisecond after closing.

@Nevon Nevon reopened this Mar 9, 2022
@cjimenezsaiz
Copy link
Author

As extended error classes are currently be using in the code, we have to decide if the correct approach is to resolve with more info or maybe, reject with a special error.

At the same time, we should use Promise.allSettled()instead of Promise.all(), to be sure that we control which messages were commited, here we need to take into account that Nodejs 12.9.0 will be necessary.

@t-d-d
Copy link
Contributor

t-d-d commented Mar 11, 2022 via email

@Igor-Kuzmin
Copy link
Contributor

Hi,
We also faced this problem, and it is critical for us. I would like to fix it if you don't mind.

@cjimenezsaiz
Copy link
Author

Hi, no problem from my side, I have sudden rush of work and I have deferred this task with no forecast date

@nick4fake
Copy link

Sorry, any movement on this? Sounds like something that might happen on any production system that uses this library

@eladchen
Copy link

eladchen commented Mar 24, 2024

So basically this issue means that once a node is down (like when MSK clusters go through security patches, which are frequent) we may end up losing messages?

@cjimenezsaiz
Copy link
Author

@eladchen Yes

@agilenc
Copy link

agilenc commented Sep 23, 2024

Any update on this? We are also facing this critical issue. It's been 2y since first reported, would be great to have a fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants