Skip to content

Commit 6080641

Browse files
author
Bharat Baddepudi
committed
#528: Update README with Kafka producer and more fixes from testing.
Summary: Also includes: - Removed iot-app example from README. - Fix null value bind for non-existing columns. - More sanity checks. - Corner case checks for some data types. Test Plan: Ran the following and ensured table has the data. ``` ./bin/connect-standalone ./etc/kafka/kafka.connect.properties ./etc/kafka-connect-yugabyte/yugabyte.sink.properties ``` Reviewers: mihnea Reviewed By: mihnea Subscribers: yql Differential Revision: https://phabricator.dev.yugabyte.com/D5656
1 parent 1aa2027 commit 6080641

File tree

5 files changed

+164
-86
lines changed

5 files changed

+164
-86
lines changed

README.md

+92-59
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,65 @@
22

33
This is the framework to connect Kafka streams to use [YugaByte DB](https://github.com/YugaByte/yugabyte-db) as the data sink using the [Kafka Connect](https://docs.confluent.io/3.0.0/connect/intro.html) mechanism.
44

5-
NOTE: This is a highly work in progress repo and the instructions/steps below can change.
6-
75
## Prerequisites
86

9-
For building these projects it requires following tools. Please refer README.md files of individual projects for more details.
7+
For building and using this project, we requires following tools pre-installed on the system.
108
- JDK - 1.8+
119
- Maven - 3.3+
12-
- Cassandra Core driver - 3.2.0
13-
- Confluent Kafka (we assume this is installed in the `~/code/confluent-os/confluent-5.0.0` directory).
1410

1511
## Steps to setup and run connect sink
16-
1. Download Confluent Open Source from https://www.confluent.io/download/. Unbundle the content of the tar.gz to say ```~/code/confluent-os/confluent-5.0.0```.
12+
1. Download Confluent Open Source from https://www.confluent.io/download/. This is a manual step, as Email id is needed (as of Nov 2018).
13+
Unbundle the content of the tar.gz to location `~/yb-kafka/confluent-os/confluent-5.0.0` using these steps.
14+
```
15+
mkdir -p ~/yb-kafka/confluent-os
16+
cd ~/yb-kafka/confluent-os
17+
tar -xvf confluent-5.0.0-2.11.tar.gz
18+
```
1719

1820
2. Include the YugaByte DB sink into the registered connectors:
19-
- Add the "yb-sink=kafka-connect-yugabyte/sink.properties" in bin/confluent file within the `declare -a connector_properties={` scope.
20-
- Copy the sample kafka-sink properties file from this repos `resources/examples/kafka.connector` to `~/code/confluent-os/confluent-5.0.0/etc/kafka-connect-yugabyte/`
21-
- For now, set the following in the ```~/code/confluent-os/confluent-5.0.0/etc/kafka/connect-standalone.properties``` file
21+
- Add `yb-sink=kafka-connect-yugabyte/kafka.sink.properties` to `~/yb-kafka/confluent-os/confluent-5.0.0/bin/confluent` file in this scope:
2222
```
23-
key.converter.schemas.enable=false
24-
value.converter.schemas.enable=false
23+
declare -a connector_properties=(
24+
...
25+
"yb-sink=kafka-connect-yugabyte/kafka.sink.properties"
26+
)
2527
```
2628
2729
3. Include dependent jar into kafka connectors:
28-
- Run 'mvn clean install -DskipTests' in the top-level directory in this repo and copy the jar file:
30+
- Build the jar from this repo and copy it for use by Kafka:
31+
```
32+
cd ~/yb-kafka/yb-kafka-connector/
33+
mvn clean install -DskipTests
34+
mkdir ~/yb-kafka/confluent-os/confluent-5.0.0/share/java/kafka-connect-yugabyte/
35+
cp ~/yb-kafka/yb-kafka-connector/target/yb-kafka-connnector-1.0.0.jar ~/yb-kafka/confluent-os/confluent-5.0.0/share/java/kafka-connect-yugabyte/
2936
```
30-
mkdir ~/code/confluent-os/confluent-5.0.0/share/java/kafka-connect-yugabyte/; cp ~/code/yb-kafka-connector/target/yb-kafka-connnector-1.0.0.jar ~/code/confluent-os/confluent-5.0.0/share/java/kafka-connect-yugabyte/
37+
- Download the dependent jars from maven central repository using the following commands.
3138
```
32-
- Copy all the dependent jars from Cassandra into the same folder. Most of these can be downloaded from maven central repository. The final list of jars should look like this:
39+
cd ~/yb-kafka/confluent-os/confluent-5.0.0/share/java/kafka-connect-yugabyte/
40+
wget http://central.maven.org/maven2/io/netty/netty-all/4.1.25.Final/netty-all-4.1.25.Final.jar
41+
wget http://central.maven.org/maven2/com/yugabyte/cassandra-driver-core/3.2.0-yb-18/cassandra-driver-core-3.2.0-yb-18.jar
42+
wget http://central.maven.org/maven2/com/codahale/metrics/metrics-core/3.0.1/metrics-core-3.0.1.jar
3343
```
34-
-rw-r--r--@ 85449 Oct 27 2013 metrics-core-3.0.1.jar
35-
-rw-r--r--@ 3823147 Oct 27 15:18 netty-all-4.1.25.Final.jar
36-
-rw-r--r-- 1100520 Oct 29 11:18 cassandra-driver-core-3.2.0-yb-18.jar
37-
-rw-r--r-- 14934 Oct 29 11:19 yb-kafka-connnector-1.0.0.jar
44+
45+
The final list of jars should look like this:
46+
```
47+
$ ls -al
48+
-rw-r--r--@ 85449 Oct 27 2013 metrics-core-3.0.1.jar
49+
-rw-r--r--@ 3823147 Oct 27 15:18 netty-all-4.1.25.Final.jar
50+
-rw-r--r-- 1100520 Oct 29 11:18 cassandra-driver-core-3.2.0-yb-18.jar
51+
-rw-r--r-- 14934 Oct 29 11:19 yb-kafka-connnector-1.0.0.jar
3852
```
3953
40-
4. Do the following to run Kafka and related dependencies:
54+
4. Do the following to run Kafka and related components:
4155
```
42-
export PATH=$PATH:~/code/confluent-os/confluent-5.0.0/bin
56+
export PATH=$PATH:~/yb-kafka/confluent-os/confluent-5.0.0/bin
4357
confluent start
4458
confluent stop connect
4559
confluent stop kafka-rest
4660
confluent status
4761
```
4862
49-
The output for this should look like
63+
The output for the `confluent status` should look like
5064
```
5165
control-center is [UP]
5266
ksql-server is [UP]
@@ -58,55 +72,74 @@ For building these projects it requires following tools. Please refer README.md
5872
```
5973
6074
5. Install YugaByte DB and create the keyspace/table.
61-
- [Install YugaByte DB and start a local cluster](https://docs.yugabyte.com/quick-start/install/).
62-
- Create the keyspace and table by running the following command. You can find `cqlsh` in the `bin` sub-directory located inside the YugaByte installation folder.
63-
```sh
64-
cqlsh -f resources/examples/table.cql
65-
```
66-
67-
6. Create a topic (one time)
75+
- [Install YugaByte DB and start a local cluster](https://docs.yugabyte.com/quick-start/install/).
76+
- Create a keyspace and table by running the following command. You can find `cqlsh` in the `bin` sub-directory located inside the YugaByte installation folder.
77+
```sh
78+
$> cqlsh
79+
cqlsh> CREATE KEYSPACE IF NOT EXISTS demo;
80+
cqlsh> CREATE TABLE demo.test_table (key text, value bigint, ts timestamp, PRIMARY KEY (key));
6881
```
69-
~/code/confluent-os/confluent-5.0.0/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic iot-event
70-
```
71-
and then run the sample Kafka producer java app (need to tweak this to not depend on the https://github.com/YugaByte/yb-iot-fleet-management repo):
7282

83+
6. Kafka topic producer
84+
Run the following to produce data in a test topic
7385
```
74-
cd ~/code/yb-iot-fleet-management; java -jar iot-kafka-producer/target/iot-kafka-producer-1.0.0.jar
86+
$ ~/yb-kafka/confluent-os/confluent-5.0.0/bin/kafka-console-producer --broker-list localhost:9092 --topic test_topic
7587
```
76-
77-
7. Run the Kafka Connect in standalone mode
88+
Just cut-paste the following lines at the prompt:
7889
```
79-
cd ~/code/confluent-os/confluent-5.0.0; ./bin/connect-standalone ./etc/kafka/connect-standalone.properties ./etc/kafka-connect-yugabyte/sink.properties
90+
{"key" : "A", "value" : 1, "ts" : 20002000}
91+
{"key" : "B", "value" : 2, "ts" : 20002020}
92+
{"key" : "C", "value" : 3, "ts" : 20202020}
8093
```
81-
Note: In the connect-standalone need to set *.enable.schema to false.
94+
Feel free to Ctrl-C this process or switch to a different shell as more values can be added later as well to the same topic.
8295

83-
You should see something like this (relevant lines from YBSinkTask.java):
84-
```
85-
[2018-10-28 16:24:16,037] INFO Start with keyspace=iotdemo, table=ingress (com.yb.connect.sink.YBSinkTask:69)
86-
[2018-10-28 16:24:16,054] INFO Connecting to nodes: /127.0.0.1:9042,/127.0.0.2:9042,/127.0.0.3:9042 (com.yb.connect.sink.YBSinkTask:149)
87-
[2018-10-28 16:24:16,517] INFO Connected to cluster: cluster1 (com.yb.connect.sink.YBSinkTask:155)
88-
[2018-10-28 16:24:16,594] INFO Processing 1 records from Kafka. (com.yb.connect.sink.YBSinkTask:419)
89-
[2018-10-28 16:24:16,612] INFO Prepare SinkRecord ...
90-
[2018-10-28 16:24:16,618] INFO Bind timestamp of type timestamp (com.yb.connect.sink.YBSinkTask:204)
91-
...
92-
```
96+
7. Setup and run the Kafka Connect Sink
97+
- Setup the properties for kakfa server
98+
```
99+
cp ~/yb-kafka/yb-kafka-connector/resources/examples/kafka.connect.properties ~/yb-kafka/confluent-os/confluent-5.0.0/etc/kafka/
100+
```
101+
102+
*Note*: Setting the `bootstrap.servers` to a remote host/ports in the same file can help connect to any accessible existing Kafka cluster.
103+
104+
- Do the following to setup a yugabyte sink properties file
105+
```
106+
mkdir -p ~/yb-kafka/confluent-os/confluent-5.0.0/etc/kafka-connect-yugabyte
107+
cp ~/yb-kafka/yb-kafka-connector/resources/examples/yugabyte.sink.properties ~/yb-kafka/confluent-os/confluent-5.0.0/etc/kafka-connect-yugabyte
108+
```
109+
110+
*Note*: The keyspace and tablename values in the kafka.connect.properties file should match the values in the cqlsh commands in step 5.
111+
The topics value should match the topic name from producer in step 6.
112+
Setting the `yugabyte.cql.contact.points` to a non-local list of host/ports will help connect to any remote accessible existing YugaByte DB cluster.
113+
114+
- Finally run the connect sink in standalone mode:
115+
```
116+
cd ~/yb-kafka/confluent-os/confluent-5.0.0
117+
./bin/connect-standalone ./etc/kafka/kafka.connect.properties ./etc/kafka-connect-yugabyte/yugabyte.sink.properties
118+
```
119+
120+
You should see something like this (relevant lines from YBSinkTask.java) on the console:
121+
```
122+
[2018-10-28 16:24:16,037] INFO Start with keyspace=demo, table=test_table (com.yb.connect.sink.YBSinkTask:69)
123+
[2018-10-28 16:24:16,054] INFO Connecting to nodes: /127.0.0.1:9042,/127.0.0.2:9042,/127.0.0.3:9042 (com.yb.connect.sink.YBSinkTask:149)
124+
[2018-10-28 16:24:16,517] INFO Connected to cluster: cluster1 (com.yb.connect.sink.YBSinkTask:155)
125+
[2018-10-28 16:24:16,594] INFO Processing 3 records from Kafka. (com.yb.connect.sink.YBSinkTask:419)
126+
[2018-10-28 16:24:16,602] INFO Insert INSERT INTO demo.test_table(key,ts,value) VALUES (?,?,?) (com.yb.connect.sink.YBSinkTask:417)
127+
[2018-10-28 16:24:16,612] INFO Prepare SinkRecord ...
128+
[2018-10-28 16:24:16,618] INFO Bind 'ts' of type timestamp (com.yb.connect.sink.YBSinkTask:204)
129+
...
130+
```
93131
94132
8. Confirm that the rows are in the table using cqlsh.
95133
```
96-
cqlsh> select * from iotdemo.ingress limit 5;
97-
98-
vehicleid | routeid | vehicletype | longitude | latitude | timestamp | speed | fuellevel
99-
--------------------------------------+----------+-------------+------------+-----------+---------------------------------+-------+-----------
100-
cc83f207-f233-49f3-8474-3b5f88379b93 | Route-43 | Small Truck | -97.978294 | 35.816948 | 2018-10-28 16:24:51.000000+0000 | 45 | 37
101-
0cb3908c-34a2-4642-8a22-9c330030d0d3 | Route-43 | Large Truck | -97.32471 | 35.677494 | 2018-10-28 16:24:51.000000+0000 | 68 | 35
102-
6c6083c6-d05d-48a9-8119-10d7b348272d | Route-82 | 18 Wheeler | -96.268425 | 34.652107 | 2018-10-28 16:24:51.000000+0000 | 75 | 13
103-
d7f2f71a-4347-46a3-aa28-c4048328e7f5 | Route-82 | Large Truck | -96.2321 | 34.241295 | 2018-10-28 16:24:51.000000+0000 | 77 | 37
104-
7af07ac6-0902-42ee-ad1c-657e96473dac | Route-37 | Small Truck | -95.0934 | 33.29403 | 2018-10-28 15:17:55.000000+0000 | 89 | 22
105-
106-
(5 rows)
107-
cqlsh> select count(*) from iotdemo.ingress;
108-
...
134+
cqlsh> select * from demo.test_table;
135+
136+
key | value | ts
137+
-----+-------+---------------------------------
138+
A | 1 | 1970-01-01 05:33:22.000000+0000
139+
C | 3 | 1970-01-01 05:36:42.020000+0000
140+
B | 2 | 1970-01-01 05:33:22.020000+0000
109141
```
142+
Note that the timestamp value gets printed as a human readable date format automatically.
110143
111144
## Future Work
112145
- Add more data types.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Sample file with Kafka server properties.
2+
3+
# Parameters for use by (local) Kafka server.
4+
bootstrap.servers=localhost:9092
5+
offset.storage.file.filename=/tmp/yb.connect.offsets
6+
offset.flush.interval.ms=10000
7+
8+
# Format of data in Kafka and how to translate it into YB connect sink data.
9+
key.converter=org.apache.kafka.connect.storage.StringConverter
10+
value.converter=org.apache.kafka.connect.json.JsonConverter
11+
key.converter.schemas.enable=false
12+
value.converter.schemas.enable=false
13+
14+
# Location of YB sink jar.
15+
plugin.path=share/java

resources/examples/table.cql

-8
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
# Sample yugabyte sink properties.
2+
13
name=yugabyte-sink
24
connector.class=com.yb.connect.sink.YBSinkConnector
3-
tasks.max=1
4-
topics=iot-event
5-
yugabyte.cql.keyspace=iotdemo
6-
yugabyte.cql.tablename=ingress
7-
yugabyte.cql.contact.points=127.0.0.1:9042,127.0.0.2:9042,127.0.0.3:9042
85

6+
topics=test_topic
7+
8+
yugabyte.cql.keyspace=demo
9+
yugabyte.cql.tablename=test_table
10+
yugabyte.cql.contact.points=127.0.0.1:9042,127.0.0.2:9042,127.0.0.3:9042

0 commit comments

Comments
 (0)