Skip to content

Commit

Permalink
Updated Kafka examples for 1.1.0.RELEASE
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Bogoevici committed Feb 10, 2015
1 parent 89b12b6 commit 6748e32
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 48 deletions.
59 changes: 39 additions & 20 deletions kafka-message-bus/README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
Kafka Message Bus
=================

In this example, you will learn how to use [Apache Kafka](http://kafka.apache.org) as an inter-container transport for Spring XD.
In this example, you will learn how to use [Apache Kafka](http://kafka.apache.org) as an inter-container transport for
Spring XD.

We will begin by demonstrating a very simple configuration based on the single-node mode, and we will follow with a more elaborate one, showcasing distribution and failover.
We will begin by demonstrating a very simple configuration based on the single-node mode, and we will follow with a
more elaborate one, showcasing distribution and failover.

Prerequisites
-------------

In order to get started, make sure that you have the following components installed:

* Spring XD ([instructions](https://github.com/spring-projects/spring-xd/wiki/Getting-Started))
* Kafka - including ZooKeeper ([instructions](http://kafka.apache.org/documentation.html#quickstart))
* Spring XD 1.1.0.RELEASE ([instructions](https://github.com/spring-projects/spring-xd/wiki/Getting-Started))
* Kafka 0.8.1.1 - including ZooKeeper ([instructions](http://kafka.apache.org/documentation.html#quickstart))


Single-node mode
----------------

While Spring XD applications can take full advantage of the Kafka message bus in distributed mode, the standalone mode is a quick and easy way to get started.
While Spring XD applications can take full advantage of the Kafka message bus in distributed mode, the standalone mode
is a quick and easy way to get started.

To begin, make sure that Spring XD 1.1 is installed correctly.
To begin, make sure that Spring XD is installed correctly.

Start Zookeeper and Kafka using default settings.

Expand All @@ -35,7 +38,8 @@ In the shell, deploy a simple stream.

xd> stream create httptest --definition "http --port=9999 | log" --deploy

Once the stream is deployed, a Kafka topic with the name `httptest.0` is created for the stream. Partitioning is set to one (as the singlenode application has a single container).
Once the stream is deployed, a Kafka topic with the name `httptest.0` is created for the stream. Partitioning is set
to one (as the singlenode application has a single container).

Next, monitor the state of the topics. From the Apache Kafka installation directory, run:

Expand All @@ -47,20 +51,23 @@ The expected result should look as follows
Topic: httptest.0 Partition: 0 Leader: 0 Replicas: 0 Isr: 0


In order test that the message bus works, send some data from the shell to the http endpoint and watch for the output in the logs.
In order to test that the message bus works, send some data from the shell to the http endpoint and watch for the
output in the logs.

xd>http post localhost:9999 -d "1"

xd>http post localhost:9999 -d "2"

xd>http post localhost:9999 -d "3"

You should see the output in the console, which indicates that the http and log endpoints communicate successfully through the Kafka message bus.
You should see the output in the console, which indicates that the http and log endpoints communicate successfully
through the Kafka message bus.

Distributed mode
----------------

In order to fully demonstrate the capabilities of using Kafka as a transport, we will start Spring XD in distributed mode.
In order to fully demonstrate the capabilities of using Kafka as a transport, we will start Spring XD in
distributed mode.

### Default stream partitioning

Expand Down Expand Up @@ -88,6 +95,7 @@ From the shell, create another stream:

xd> stream create httptest-dist --definition "http --port=9998 | log"


For deploying the stream, you will use two settings that will illustrate how Kafka topic partitioning works with Spring XD. To do so, you will deploy 2 instances of the logging module. As a result, Spring XD will create a Kafka topic with two partitions, one for each module. Execute the following command in the Spring XD shell (make sure that it is all copied on one line):

xd> stream deploy httptest-dist --properties "module.log.count=2"
Expand All @@ -102,7 +110,8 @@ The output should contain:
Topic: httptest-dist.0 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: httptest-dist.0 Partition: 1 Leader: 0 Replicas: 0 Isr: 0

Note the topic created for the stream, and its two partitions. Each Spring XD container will listen to one of these partitions.
Note the topic created for the stream, and its two partitions. Each Spring XD container will listen to one of these
partitions.

Again, you will confirm that the message bus is functional, by sending a few messages and monitoring the log:

Expand All @@ -116,22 +125,29 @@ You should see the output in the containers.

### Controlled stream partitioning and failover

By default, when the HTTP endpoint sends a message to the bus, and therefore to the associated Apache Kafka topic, a partition is chosen randomly. This is a useful default, that ensures that the messages are balanced evenly between consumers.
By default, when the HTTP endpoint sends a message to the bus, and therefore to the associated Apache Kafka topic, a
partition is chosen randomly. This is a useful default, that ensures that the messages are balanced evenly between consumers.

Sometimes, there are business reasons for controlling stream partitioning in Spring XD, and the complete functionality is described [here](https://github.com/spring-projects/spring-xd/wiki/Deployment#stream-partitioning).
Sometimes, there are business reasons for controlling stream partitioning in Spring XD, and the complete functionality
is described [here](https://github.com/spring-projects/spring-xd/wiki/Deployment#stream-partitioning).

In this case case, we would like to showcase the fact that the messages sent by the HTTP endpoint are indeed sent to different partitions of the topic, and that each container receives messages sent to a particular partition. In order to do so, undeploy the previous example first (if necessary):
In this case case, we would like to showcase the fact that the messages sent by the HTTP endpoint are indeed sent
to different partitions of the topic, and that each container receives messages sent to a particular partition.
In order to do so, undeploy the previous example first (if necessary):

xd> stream undeploy httptest-dist

Subsequently, redeploy it with an additional property, whcih instructs Spring XD to pick up one partition or another based on the message payload.
Subsequently, redeploy it with an additional property, whcih instructs Spring XD to pick up one partition or another
based on the message payload.

xd> stream deploy httptest-dist \
--properties "module.log.count=2,module.http.producer.partitionKeyExpression=payload"

As a result of the previous command, an expression of the form `payload.hashCode()%2` will be evaluated for each message, choosing either partition 0 or partition 1 as destination. (Spring XD will use the modulus of 2, because there are 2 partitions.)
As a result of the previous command, an expression of the form `payload.hashCode()%2` will be evaluated for each
message, choosing either partition 0 or partition 1 as destination. (Spring XD will use the modulus of 2, because there are 2 partitions.)

Now start sending messages - as a side effect of our partitioning strategy, odd numbers are sent to one container, even numbers to the other (because the hashcode of the numeric content is the value).
Now start sending messages - as a side effect of our partitioning strategy, odd numbers are sent to one container,
even numbers to the other (because the hashcode of the numeric content is the value).

xd> http post http://localhost:9999 --data "1"

Expand All @@ -147,7 +163,8 @@ Next, shut down one of the containers, and continue sending data.

xd> http post http://localhost:9999 --data "6"

Note that remaining container will start receiving all the messages, thus proving that the message processing continues for the stream (albeit in a single container).
Note that remaining container will receive the messages that belong to it, thus proving that the message processing
continues for the stream (albeit in a single container).

Finally, restart the previous container.

Expand All @@ -157,7 +174,8 @@ Finally, restart the previous container.

xd> http post http://localhost:9999 --data "9"

After the second container has been restarted, each container will receive messages from a single partition again, restoring the distributed topology that you started initially.
After the second container has been restarted, each container will receive messages from a single partition again,
restoring the distributed topology that you started initially.

Conclusion
----------
Expand All @@ -166,7 +184,8 @@ In this demo, you have learned:

* how to set up Kafka as a message bus for Spring XD;
* how Spring XD manages topic partitioning when creating a Kafka-based message bus;
* how Spring XD handles partition distribution across containers in a distributed scenario by allocating a number of partitions at start;
* how Spring XD handles partition distribution across containers in a distributed scenario by allocating a number of
partitions at start;
* how in the case of container failure, in-flight messages are redirected to remaining working containers;
* how the addition (or restart) of a container causes a redistribution of the partitions across containers.

Expand Down
64 changes: 42 additions & 22 deletions kafka-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ Kafka Source Demo

In this example, you will learn how to set up an [Apache Kafka](http://kafka.apache.org) source for Spring XD.

We will begin by demonstrating a very simple configuration based on the single node mode, and we will follow up with a more elaborate one, demostrating distribution and failover.
We will begin by demonstrating a very simple configuration based on the single node mode, and we will follow up with a
more elaborate one, demonstrating distribution and failover.

Prerequisites
-------------
Expand All @@ -17,9 +18,11 @@ In order to get started, make sure that you have the following components instal
Single-node mode
----------------

While Spring XD applications can take full advantage of the Apache Kafka message bus while working in distributed mode, Spring XD's singlenode mode is a quick and easy way to get started.
While Spring XD applications can take full advantage of the Apache Kafka message bus while working in distributed mode,
Spring XD's singlenode mode is a quick and easy way to get started.

Start Zookeeper 3.4.6. There is an installation script for this version in the spring xd distribution or download your own copy. In the Zookeeper installation directory start the server:
Start Zookeeper 3.4.6. There is an installation script for this version in the spring xd distribution or download your
own copy. In the Zookeeper installation directory start the server:

```
$ ./bin/zkServer.sh start-foreground
Expand All @@ -34,7 +37,8 @@ Now create a topic in Kafka. From the Kafka installation directory, run:
$ ./bin/kafka-topics.sh --topic kafka-source-test --create --zookeeper localhost:2181 --partitions 6 --replication 1
Please note that we create a topic with 6 partitions (you can use any number for testing, but we have chosen 6 for this example).
Please note that we create a topic with 6 partitions (you can use any number for testing, but we have chosen 6 for this
example).
Start Spring XD in singlenode mode in the XD installation directory
Expand All @@ -43,7 +47,8 @@ $ cd $XD_HOME
$ ./bin/xd-singlenode
```
Note that there are two Zookeeper instances running, one standlone when starting the Kafka broker and another that is embedded inside `xd-singlenode`. You can configure the port number that `xd-singlenode` will use for its embedded Zookeeper instance to match what your Kafka broker is expecting to use. By default the Kafka zookeeper host and port are configured to `localhost:2181`. To set the port number that `xd-singlenode` will use, set the environment variable `ZK_EMBEDDED_SERVER_PORT=2181`. Since we will be switching to XD distributed mode later in this example, it is better to use seperate Zookeeper server instances.
Note that there are two Zookeeper instances running, one standlone when starting the Kafka broker and another that is
embedded inside `xd-singlenode`. You can configure the port number that `xd-singlenode` will use for its embedded Zookeeper instance to match what your Kafka broker is expecting to use. By default the Kafka zookeeper host and port are configured to `localhost:2181`. To set the port number that `xd-singlenode` will use, set the environment variable `ZK_EMBEDDED_SERVER_PORT=2181`. Since we will be switching to XD distributed mode later in this example, it is better to use seperate Zookeeper server instances.
Start Spring XD shell
Expand All @@ -64,7 +69,8 @@ Once the command is running, start typing at the console.
You should see the messages in the log, indicating that the Kafka source receives messages.
This has created a single-threaded consumer for the topic. Should you wish to increase concurrency in the consumer, you can use the `--streams` parameter as follows:
This has created a single-threaded consumer for the topic. Should you wish to increase concurrency in the consumer,
you can use the `--streams` parameter as follows:
First, destroy the stream if necessary:
Expand All @@ -78,15 +84,23 @@ Then, deploy the stream with a higher concurrency setting for the Apache Kafka s
Distributed mode
----------------
Simply increasing the concurrency in a single Kafka source is one way of improving performance, but does not use the capabilities of Apache Kafka to their full extent. Running concurrent clients on the same machine (with the same NIC) will rapidly saturate the receiving capabilites of the client.
Simply increasing the concurrency in a single Kafka source is one way of improving performance, but does not use the
capabilities of Apache Kafka to their full extent. Running concurrent clients on the same machine (with the same NIC)
will rapidly saturate the receiving capabilities of the client.
A better way to improve the ingestion rate from Kafka is to create multiple Kafka sources in Spring XD, each running on a separate container, and each consuming a subset of the overall partition set of the inbound topic. Also, this strategy improves fault tolerance.
A better way to improve the ingestion rate from Kafka is to create multiple Kafka sources in Spring XD, each running on
a separate container, and each consuming a subset of the overall partition set of the inbound topic. Also, this strategy
improves fault tolerance.
(Please note that for the sake of simplicity, this demo will use containers deployed on the same machine. Please refer to the Spring XD guide for details on how to run your containers on separate machines).
(Please note that for the sake of simplicity, this demo will use containers deployed on the same machine. Please refer
to the Spring XD guide for details on how to run your containers on separate machines).
Start Apache ZooKeeper and Apache Kafka with default settings as described in the singlenode section.
Start Spring XD in distributed mode. Check the instructions for starting Spring XD in distributed mode [here](https://github.com/spring-projects/spring-xd/wiki/Running-Distributed-Mode)), and make sure that you started the following:
Start Spring XD in distributed mode. Check the instructions for starting Spring XD in distributed mode
[here](https://github.com/spring-projects/spring-xd/wiki/Running-Distributed-Mode)), and make sure that you started the
following, as well:
* HSQLDB
* Redis
Expand All @@ -104,30 +118,36 @@ Start the shell:
From the shell, create a sample stream again (destroy any existing stream if it exists):
xd> stream create kafka-source-test --definition "kafka --zkconnect=localhost:2181 --topic=kafka-source-test | log"
xd> stream create kafka-source-test --definition "kafka --zkconnect=localhost:2181 --topic=kafka-source-test | log --expression=payload.concat(' on partition ').concat(headers['kafka__partitionId'].toString())" --deploy"
In what comes next, we will instruct Spring XD to deploy the modules (both Kafka source and log) on all available containers. This means that each available container will have a Kafka and a log module, and that they will be connected through direct channels, as described [here](https://github.com/spring-projects/spring-xd/wiki/Deployment#direct-binding). Execute the following command:
In what comes next, we will instruct Spring XD to deploy the modules (both Kafka source and log) on multiple containers.
This means that each available container will have a Kafka and a log module, and that they will be connected through direct channels, as described [here](https://github.com/spring-projects/spring-xd/wiki/Deployment#direct-binding). Execute the following command:
xd> stream deploy kafka-source-test --properties "module.*.count=0"
xd> stream deploy kafka-source-test --properties "module.*.count=3"
For our demo's purpose, this means that each log sink will output messages displayed by the Kafka source that runs in the same container. This way, we can monitor the partitions that each source listens to.
For the last step, we will compile and run the demo class that is attached to this project. It will send 1000 messages to the topic. The producer uses a simple Partitioner implementation that ensures that messages are evenly distributed across the partitions.
$ ./gradlew run
Now, monitor the results in the container logs. You should see each container logging messages from a number of partitions (typically, 2 each), e.g.
Now, monitor the results in the container logs. You should see each container logging messages from a number of
partitions (typically, 2 each), e.g.
03:27:45,756 1.1.0.SNAP INFO task-scheduler-1 sink.kafka-source-test - {1=[943-Fri Nov 07 03:27:45 EST 2014]}
03:27:46,280 1.1.0.SNAP INFO task-scheduler-1 sink.kafka-source-test - {0=[948-Fri Nov 07 03:27:46 EST 2014]}
03:27:46,385 1.1.0.SNAP INFO task-scheduler-1 sink.kafka-source-test - {1=[949-Fri Nov 07 03:27:46 EST 2014]}
03:27:46,907 1.1.0.SNAP INFO task-scheduler-1 sink.kafka-source-test - {0=[954-Fri Nov 07 03:27:46 EST 2014]}
Note that messages logged by this example have a structure of `{#partitionNumber=[Message #messageNumber at Fri Nov 07 03:27:46 EST 2014]}`. This should help you identify the partitions that each source is listening to.
2015-02-10 13:20:23,600 1.1.0.SNAP INFO dispatcher-1 sink.kafka-source-test - Message 96 at Tue Feb 10 13:20:23 EST 2015 on partition 0
2015-02-10 13:20:23,601 1.1.0.SNAP INFO dispatcher-1 sink.kafka-source-test - Message 97 at Tue Feb 10 13:20:23 EST 2015 on partition 1
Note that messages logged by this example have a structure of `Message #messageNumber at Fri Nov 07 03:27:46 EST 2014 on partition #partitionNumber`.
This should help you identify the partitions that each source is listening to.
Now, shut down one container, and send messages again. You should see the remaining containers listening to their original partitions.
Next, restart the container, and keep sending messages. You should see the the newly arrived container receiving messages again,
resuming from where it left.
Now, shut down the container, and send messages again. You should see the partitions distributed between the remaining containers.
Now, start a fourth container and keep sending messages. You should see the existing containers receiving messages on their original partitions.
Next, restart the container, and keep sending messages. You should see the the newly arrived container receiving messages again, from a couple of partitions, while the other containers will not receive messages from the same partition again.
Shut down one of the previous containers. You should see the new container picking up reception from the one that just left.
For a more interesting test, you can try running the message sending code in an infinite loop, and try starting an stopping containers.
Expand Down
2 changes: 1 addition & 1 deletion kafka-source/gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Thu Nov 06 21:44:54 EST 2014
#Mon Feb 09 15:21:07 EST 2015
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
Expand Down
Loading

0 comments on commit 6748e32

Please sign in to comment.