diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index bd652e0a32ef1..aa971812f8758 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -109,7 +109,7 @@ public void run() { } } } catch (Throwable e) { - Utils.printOut("Unhandled exception"); + Utils.printErr("Unhandled exception"); e.printStackTrace(); } Utils.printOut("Fetched %d records", numRecords - remainingRecords); diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index cf12cc9f15812..15488c9c47d23 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -49,6 +50,8 @@ * This class implements a read-process-write application. */ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener, AutoCloseable { + private static final int MAX_RETRIES = 5; + private final String bootstrapServers; private final String inputTopic; private final String outputTopic; @@ -103,19 +106,21 @@ public ExactlyOnceMessageProcessor(String threadName, @Override public void run() { + int retries = 0; int processedRecords = 0; long remainingRecords = Long.MAX_VALUE; + // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster int transactionTimeoutMs = 10_000; // consumer must be in read_committed mode, which means it won't be able to read uncommitted data boolean readCommitted = true; + try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { // called first and once to fence zombies and abort any pending transaction producer.initTransactions(); - consumer.subscribe(singleton(inputTopic), this); Utils.printOut("Processing new records"); @@ -140,6 +145,7 @@ public void run() { // commit the transaction including offsets producer.commitTransaction(); processedRecords += records.count(); + retries = 0; } } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { @@ -151,18 +157,21 @@ public void run() { Utils.printOut("Invalid or no offset found, using latest"); consumer.seekToEnd(emptyList()); consumer.commitSync(); + retries = 0; } catch (KafkaException e) { - // abort the transaction and try to continue - Utils.printOut("Aborting transaction: %s", e); + // abort the transaction + Utils.printOut("Aborting transaction: %s", e.getMessage()); producer.abortTransaction(); + retries = maybeRetry(retries, consumer); } + remainingRecords = getRemainingRecords(consumer); if (remainingRecords != Long.MAX_VALUE) { Utils.printOut("Remaining records: %d", remainingRecords); } } } catch (Throwable e) { - Utils.printOut("Unhandled exception"); + Utils.printErr("Unhandled exception"); e.printStackTrace(); } Utils.printOut("Processed %d records", processedRecords); @@ -215,6 +224,44 @@ private long getRemainingRecords(KafkaConsumer consumer) { }).sum(); } + /** + * When we get a generic {@code KafkaException} while processing records, we retry up to {@code MAX_RETRIES} times. + * If we exceed this threshold, we log an error and move on to the next batch of records. + * In a real world application you may want to to send these records to a dead letter topic (DLT) for further processing. + * + * @param retries Current number of retries + * @param consumer Consumer instance + * @return Updated number of retries + */ + private int maybeRetry(int retries, KafkaConsumer consumer) { + if (retries < 0) { + Utils.printErr("The number of retries must be greater than zero"); + shutdown(); + } + + if (retries < MAX_RETRIES) { + // retry: reset fetch offset + // the consumer fetch position needs to be restored to the committed offset before the transaction started + Map committed = consumer.committed(consumer.assignment()); + consumer.assignment().forEach(tp -> { + OffsetAndMetadata offsetAndMetadata = committed.get(tp); + if (offsetAndMetadata != null) { + consumer.seek(tp, offsetAndMetadata.offset()); + } else { + consumer.seekToBeginning(Collections.singleton(tp)); + } + }); + retries++; + } else { + // continue: skip records + // the consumer fetch position needs to be committed as if records were processed successfully + Utils.printErr("Skipping records after %d retries", MAX_RETRIES); + consumer.commitSync(); + retries = 0; + } + return retries; + } + @Override public void close() throws Exception { if (producer != null) { diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 36a2583954ca8..d9d454c15593a 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -89,7 +89,7 @@ public void run() { sentRecords++; } } catch (Throwable e) { - Utils.printOut("Unhandled exception"); + Utils.printErr("Unhandled exception"); e.printStackTrace(); } Utils.printOut("Sent %d records", sentRecords);