Skip to content

Commit

Permalink
KAFKA-14752: Kafka examples improvements - demo changes (apache#13517)
Browse files Browse the repository at this point in the history
KAFKA-14752: Kafka examples improvements - demo changes

Reviewers: Luke Chen <[email protected]>
  • Loading branch information
fvaleri authored May 12, 2023
1 parent 54a4067 commit c757af5
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 188 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,6 @@ project(':examples') {

dependencies {
implementation project(':clients')
implementation project(':server-common')
}

javadoc {
Expand Down
1 change: 0 additions & 1 deletion checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@

<subpackage name="examples">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.util" />
</subpackage>

<subpackage name="log.remote">
Expand Down
12 changes: 0 additions & 12 deletions examples/README

This file was deleted.

9 changes: 9 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Kafka client examples

This module contains some Kafka client examples.

1. Start a Kafka 2.5+ local cluster with a plain listener configured on port 9092.
2. Run `examples/bin/java-producer-consumer-demo.sh 10000` to asynchronously send 10k records to topic1 and consume them.
3. Run `examples/bin/java-producer-consumer-demo.sh 10000 sync` to synchronous send 10k records to topic1 and consume them.
4. Run `examples/bin/exactly-once-demo.sh 6 3 10000` to create input-topic and output-topic with 6 partitions each,
start 3 transactional application instances and process 10k records.
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,59 @@
*/
package kafka.examples;

import org.apache.kafka.common.errors.TimeoutException;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* This example can be decomposed into the following stages:
*
* 1. Clean any topics left from previous runs.
* 2. Create a producer thread to send a set of records to topic1.
* 3. Create a consumer thread to fetch all previously sent records from topic1.
*
* If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
* You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
* record all the log output together.
*/
public class KafkaConsumerProducerDemo {
public static void main(String[] args) throws InterruptedException {
boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
CountDownLatch latch = new CountDownLatch(2);
Producer producerThread = new Producer(
"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch);
producerThread.start();

Consumer consumerThread = new Consumer(
"consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch);
consumerThread.start();

if (!latch.await(5, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish");
}
public static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static final String TOPIC_NAME = "my-topic";
public static final String GROUP_NAME = "my-group";

public static void main(String[] args) {
try {
if (args.length == 0) {
Utils.printHelp("This example takes 2 parameters (i.e. 10000 sync):%n" +
"- records: total number of records to send (required)%n" +
"- mode: pass 'sync' to send records synchronously (optional)");
return;
}

consumerThread.shutdown();
System.out.println("All finished!");
int numRecords = Integer.parseInt(args[0]);
boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync");

// stage 1: clean any topics left from previous runs
Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
CountDownLatch latch = new CountDownLatch(2);

// stage 2: produce records to topic1
Producer producerThread = new Producer(
"producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch);
producerThread.start();

// stage 3: consume records from topic1
Consumer consumerThread = new Consumer(
"consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch);
consumerThread.start();

if (!latch.await(5, TimeUnit.MINUTES)) {
Utils.printErr("Timeout after 5 minutes waiting for termination");
producerThread.shutdown();
consumerThread.shutdown();
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
220 changes: 64 additions & 156 deletions examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,185 +16,93 @@
*/
package kafka.examples;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* This exactly once demo driver takes 3 arguments:
* - partition: number of partitions for input/output topic
* - instances: number of instances
* - records: number of records
* An example argument list would be `6 3 50000`.
*
* If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`.
* Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console
* output to file` to record all the log output together.
*
* The driver could be decomposed as following stages:
*
* 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start.
*
* 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into
* the input topic. The driver will block for the record generation to finish, so the producer
* must be in synchronous sending mode.
* This example can be decomposed into the following stages:
*
* 3. Set up transactional instances in separate threads which does a consume-process-produce loop,
* tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will
* drain all the records from either given partitions or auto assigned partitions by actively
* comparing log end offset with committed offset. Each record will be processed exactly once
* as dividing the key by 2, and extend the value message. The driver will block for all the record
* processing to finish. The transformed record shall be written to the output topic, with
* transactional guarantee.
* 1. Clean any topics left from previous runs.
* 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic.
* The demo will block for the record generation to finish, so the producer is synchronous.
* 3. Set up the transactional instances in separate threads, each one executing a read-process-write loop
* (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain all records from either given
* partitions or auto assigned partitions by actively comparing log end offset with committed offset.
* Each record will be processed exactly-once with strong partition level ordering guarantee.
* The demo will block until all records are processed and written to the output topic.
* 4. Create a read_committed consumer thread to verify we have all records in the output topic,
* and record ordering at the partition level is maintained.
* The demo will block for the consumption of all committed records, with transactional guarantee.
*
* 4. Set up a read committed consumer in a separate thread to verify we have all records within
* the output topic, while the message ordering on partition level is maintained.
* The driver will block for the consumption of all committed records.
*
* From this demo, you could see that all the records from pre-population are processed exactly once,
* with strong partition level ordering guarantee.
*
* Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5
* in order to run, otherwise the app could throw
* Broker version must be >= 2.5.0 in order to run, otherwise the example will throw
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
*
* If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
* You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
* record all the log output together.
*/
public class KafkaExactlyOnceDemo {
public static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
public static final String GROUP_NAME = "check-group";

public static void main(String[] args) throws InterruptedException, ExecutionException {
if (args.length != 3) {
throw new IllegalArgumentException("Should accept 3 parameters: " +
"[number of partitions], [number of instances], [number of records]");
}

int numPartitions = Integer.parseInt(args[0]);
int numInstances = Integer.parseInt(args[1]);
int numRecords = Integer.parseInt(args[2]);

/* Stage 1: topic cleanup and recreation */
recreateTopics(numPartitions);

CountDownLatch prePopulateLatch = new CountDownLatch(1);

/* Stage 2: pre-populate records */
Producer producerThread = new Producer(
"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch);
producerThread.start();

if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population");
}

CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);

/* Stage 3: transactionally process all messages */
CountDownLatch processorsLatch = new CountDownLatch(numInstances);
List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances)
.mapToObj(id -> new ExactlyOnceMessageProcessor(
"processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
.collect(Collectors.toList());
processors.forEach(ExactlyOnceMessageProcessor::start);

if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");
}

CountDownLatch consumeLatch = new CountDownLatch(1);

/* Stage 4: consume all processed messages to verify exactly once */
Consumer consumerThread = new Consumer(
"consumer", "DemoConsumer", OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
consumerThread.start();

if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption");
}

consumerThread.shutdown();
System.out.println("All finished!");
}

private static void recreateTopics(final int numPartitions)
throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);

Admin adminClient = Admin.create(props);

List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);

deleteTopic(adminClient, topicsToDelete);

// Check topic existence in a retry loop
while (true) {
System.out.println("Making sure the topics are deleted successfully: " + topicsToDelete);

Set<String> listedTopics = adminClient.listTopics().names().get();
System.out.println("Current list of topics: " + listedTopics);

boolean hasTopicInfo = false;
for (String listedTopic : listedTopics) {
if (topicsToDelete.contains(listedTopic)) {
hasTopicInfo = true;
break;
}
}
if (!hasTopicInfo) {
break;
public static void main(String[] args) {
try {
if (args.length != 3) {
Utils.printHelp("This example takes 3 parameters (i.e. 6 3 10000):%n" +
"- partition: number of partitions for input and output topics (required)%n" +
"- instances: number of application instances (required)%n" +
"- records: total number of records (required)");
return;
}
Thread.sleep(1000);
}

// Create topics in a retry loop
while (true) {
final short replicationFactor = 1;
final List<NewTopic> newTopics = Arrays.asList(
new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor),
new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor));
try {
adminClient.createTopics(newTopics).all().get();
System.out.println("Created new topics: " + newTopics);
break;
} catch (ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException)) {
throw e;
}
System.out.println("Metadata of the old topics are not cleared yet...");

deleteTopic(adminClient, topicsToDelete);
int numPartitions = Integer.parseInt(args[0]);
int numInstances = Integer.parseInt(args[1]);
int numRecords = Integer.parseInt(args[2]);

// stage 1: clean any topics left from previous runs
Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC);

// stage 2: send demo records to the input-topic
CountDownLatch producerLatch = new CountDownLatch(1);
Producer producerThread = new Producer(
"producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch);
producerThread.start();
if (!producerLatch.await(2, TimeUnit.MINUTES)) {
Utils.printErr("Timeout after 2 minutes waiting for data load");
producerThread.shutdown();
return;
}

Thread.sleep(1000);
// stage 3: read from input-topic, process once and write to the output-topic
CountDownLatch processorsLatch = new CountDownLatch(numInstances);
List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances)
.mapToObj(id -> new ExactlyOnceMessageProcessor(
"processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
.collect(Collectors.toList());
processors.forEach(ExactlyOnceMessageProcessor::start);
if (!processorsLatch.await(2, TimeUnit.MINUTES)) {
Utils.printErr("Timeout after 2 minutes waiting for record copy");
processors.forEach(ExactlyOnceMessageProcessor::shutdown);
return;
}
}
}

private static void deleteTopic(final Admin adminClient, final List<String> topicsToDelete)
throws InterruptedException, ExecutionException {
try {
adminClient.deleteTopics(topicsToDelete).all().get();
} catch (ExecutionException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
throw e;
// stage 4: check consuming records from the output-topic
CountDownLatch consumerLatch = new CountDownLatch(1);
Consumer consumerThread = new Consumer(
"consumer", BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, numRecords, consumerLatch);
consumerThread.start();
if (!consumerLatch.await(2, TimeUnit.MINUTES)) {
Utils.printErr("Timeout after 2 minutes waiting for output read");
consumerThread.shutdown();
}
System.out.println("Encountered exception during topic deletion: " + e.getCause());
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("Deleted old topics: " + topicsToDelete);
}
}

0 comments on commit c757af5

Please sign in to comment.