Skip to content

Commit

Permalink
MINOR: Add retry mechanism to EOS example (apache#15561)
Browse files Browse the repository at this point in the history

In the initial EOS example, a retry logic was implemented within the resetToLastCommittedPositions method. During refactoring, this logic was removed becasue a poison pill prevented the example from reaching the final phase of consuming from the output topic.

In this change, I suggest to add it back, but with a retry limit defined as MAX_RETRIES. Once this limit is reached, the problematic batch will be logged and skipped, allowing the processor to move on and process remaining records. If some records are skipped, the example will still hit the hard timeout (2 minutes), but after consuming all processed records.

Reviewers: Luke Chen <[email protected]>
  • Loading branch information
fvaleri authored Mar 27, 2024
1 parent 9326476 commit 4cb2037
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 6 deletions.
2 changes: 1 addition & 1 deletion examples/src/main/java/kafka/examples/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void run() {
}
}
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
Utils.printErr("Unhandled exception");
e.printStackTrace();
}
Utils.printOut("Fetched %d records", numRecords - remainingRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -49,6 +50,8 @@
* This class implements a read-process-write application.
*/
public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener, AutoCloseable {
private static final int MAX_RETRIES = 5;

private final String bootstrapServers;
private final String inputTopic;
private final String outputTopic;
Expand Down Expand Up @@ -103,19 +106,21 @@ public ExactlyOnceMessageProcessor(String threadName,

@Override
public void run() {
int retries = 0;
int processedRecords = 0;
long remainingRecords = Long.MAX_VALUE;

// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
int transactionTimeoutMs = 10_000;
// consumer must be in read_committed mode, which means it won't be able to read uncommitted data
boolean readCommitted = true;

try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
// called first and once to fence zombies and abort any pending transaction
producer.initTransactions();

consumer.subscribe(singleton(inputTopic), this);

Utils.printOut("Processing new records");
Expand All @@ -140,6 +145,7 @@ public void run() {
// commit the transaction including offsets
producer.commitTransaction();
processedRecords += records.count();
retries = 0;
}
} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
| FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
Expand All @@ -151,18 +157,21 @@ public void run() {
Utils.printOut("Invalid or no offset found, using latest");
consumer.seekToEnd(emptyList());
consumer.commitSync();
retries = 0;
} catch (KafkaException e) {
// abort the transaction and try to continue
Utils.printOut("Aborting transaction: %s", e);
// abort the transaction
Utils.printOut("Aborting transaction: %s", e.getMessage());
producer.abortTransaction();
retries = maybeRetry(retries, consumer);
}

remainingRecords = getRemainingRecords(consumer);
if (remainingRecords != Long.MAX_VALUE) {
Utils.printOut("Remaining records: %d", remainingRecords);
}
}
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
Utils.printErr("Unhandled exception");
e.printStackTrace();
}
Utils.printOut("Processed %d records", processedRecords);
Expand Down Expand Up @@ -215,6 +224,44 @@ private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
}).sum();
}

/**
* When we get a generic {@code KafkaException} while processing records, we retry up to {@code MAX_RETRIES} times.
* If we exceed this threshold, we log an error and move on to the next batch of records.
* In a real world application you may want to to send these records to a dead letter topic (DLT) for further processing.
*
* @param retries Current number of retries
* @param consumer Consumer instance
* @return Updated number of retries
*/
private int maybeRetry(int retries, KafkaConsumer<Integer, String> consumer) {
if (retries < 0) {
Utils.printErr("The number of retries must be greater than zero");
shutdown();
}

if (retries < MAX_RETRIES) {
// retry: reset fetch offset
// the consumer fetch position needs to be restored to the committed offset before the transaction started
Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
consumer.assignment().forEach(tp -> {
OffsetAndMetadata offsetAndMetadata = committed.get(tp);
if (offsetAndMetadata != null) {
consumer.seek(tp, offsetAndMetadata.offset());
} else {
consumer.seekToBeginning(Collections.singleton(tp));
}
});
retries++;
} else {
// continue: skip records
// the consumer fetch position needs to be committed as if records were processed successfully
Utils.printErr("Skipping records after %d retries", MAX_RETRIES);
consumer.commitSync();
retries = 0;
}
return retries;
}

@Override
public void close() throws Exception {
if (producer != null) {
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/java/kafka/examples/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void run() {
sentRecords++;
}
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
Utils.printErr("Unhandled exception");
e.printStackTrace();
}
Utils.printOut("Sent %d records", sentRecords);
Expand Down

0 comments on commit 4cb2037

Please sign in to comment.