Skip to content

Commit ccff817

Browse files
bbaddepudiBharat Baddepudi
authored and
Bharat Baddepudi
committed
Add Kafka Connect Sink to YugaByte DB.
Summary: First set of changes needed to convert the Kafka input sink record into YB CQL table rows. README has the list of steps and sample output. Test Plan: Run local Kafka topic and point to the local YB task sink. `cd ~/code/confluent-os/confluent-5.0.0; ./bin/connect-standalone ./etc/kafka/connect-standalone.properties ./etc/kafka-connect-yugabyte/sink.properties` Saw 4 records in this format: ``` [2018-10-28 16:24:59,029] INFO Prepare SinkRecord{kafkaOffset=54, timestampType=CreateTime} ConnectRecord{topic='iot-data-event', kafkaPartition=0, key=null, value={routeId=Route-43, fuelLevel=35.0, latitude=35.677494, vehicleId=0cb3908c-34a2-4642-8a22-9c330030d0d3, vehicleType=Large Truck, speed=68.0, longitude=-97.32471, timestamp=2018-10-28 16:24:51}, timestamp=null, headers=ConnectHeaders(headers=)} Key/Schema=null/null Value/Schema={routeId=Route-43, fuelLevel=35.0, latitude=35.677494, vehicleId=0cb3908c-34a2-4642-8a22-9c330030d0d3, vehicleType=Large Truck, speed=68.0, longitude=-97.32471, timestamp=2018-10-28 16:24:51}/null (com.yb.connect.sink.YBSinkTask:390) ``` And checked that the table has the data: ``` cqlsh> select * from iotdemo.ingress limit 5; vehicleid | routeid | vehicletype | longitude | latitude | timestamp | speed | fuellevel --------------------------------------+----------+-------------+------------+-----------+---------------------------------+-------+----------- cc83f207-f233-49f3-8474-3b5f88379b93 | Route-43 | Small Truck | -97.978294 | 35.816948 | 2018-10-28 16:24:51.000000+0000 | 45 | 37 ... cqlsh> select count(*) from iotdemo.ingress; count ------- 83 ``` Reviewers: mihnea Subscribers: sid, kannan, ravi Differential Revision: https://phabricator.dev.yugabyte.com/D5620
1 parent 04675c7 commit ccff817

File tree

8 files changed

+754
-0
lines changed

8 files changed

+754
-0
lines changed

.arcconfig

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"phabricator.uri" : "https://phabricator.dev.yugabyte.com",
3+
"repository.callsign": "YB-Kafka-Connector",
4+
"git:arc.feature.start.default" : "upstream/master",
5+
"arc.land.onto.default": "master",
6+
"arc.feature.start.default" : "master"
7+
}

.gitignore

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
build
2+
logs
3+
target
4+
.idea
5+
.idea_modules
6+
/.classpath
7+
/.project
8+
/.settings
9+
/RUNNING_PID
10+
11+
# VIM/emacs stuff
12+
*.swp
13+
*~
14+
core*
15+
16+
perf.data
17+
perf.data.old
18+
oprofile_data
19+
.metadata/
20+
*.iml
21+
22+
app/.DS_STORE
23+
.DS_Store
24+
app/views/.DS_Store
25+
26+
node_modules/
27+
react/node_modules/
28+
*bundle.js
29+
react/public/bundle.js
30+
react/.idea/
31+
32+
npm-debug.log
33+
react/npm-debug.log
34+
35+
conf/*
36+
src/main/public/*
37+
*.pyc
38+
docker/yugaware/*.yml
39+
src/itest/static/*.log
40+
41+
ui/.env
42+
ui/public/map/
43+
44+
src/itest/python_virtual_env
45+
src/itest/web_app/appllication.conf
46+
.session.vim*

README.md

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Kafka Connect to YugaByte DB
2+
3+
This is the framework to connect Kafka streams to use [YugaByte DB](https://github.com/YugaByte/yugabyte-db) as the data sink using the [Kafka Connect](https://docs.confluent.io/3.0.0/connect/intro.html) mechanism.
4+
NOTE: This is a highly work in progress repo and the instructions/steps below can change.
5+
6+
## Prerequisites
7+
8+
For building these projects it requires following tools. Please refer README.md files of individual projects for more details.
9+
- JDK - 1.8+
10+
- Maven - 3.3+
11+
- Cassandra Core driver - 3.2.0
12+
- Confluent Kafka (we assume this is installed in the `~/code/confluent-os/confluent-5.0.0` directory).
13+
14+
## Steps to setup and run connect sink
15+
1. Download Confluent Open Source from https://www.confluent.io/download/. Unbundle the content of the tar.gz to say ```~/code/confluent-os/confluent-5.0.0```.
16+
17+
2. Include the YugaByte DB sink into the registered connectors:
18+
- Add the "yb-sink=kafka-connect-yugabyte/sink.properties" in bin/confluent file within the `declare -a connector_properties={` scope.
19+
- Copy the sample kafka-sink properties file from this repos `resources/examples/kafka.connector` to `~/code/confluent-os/confluent-5.0.0/etc/kafka-connect-yugabyte/`
20+
- For now, set the following in the ```~/code/confluent-os/confluent-5.0.0/etc/kafka/connect-standalone.properties``` file
21+
```key.converter.schemas.enable=false
22+
value.converter.schemas.enable=false```
23+
24+
3. Include dependent jar into kafka connectors:
25+
- Run 'mvn clean install -DskipTests' in the top-level directory in this repo and copy the jar file:
26+
```mkdir ~/code/confluent-os/confluent-5.0.0/share/java/kafka-connect-yugabyte/; cp ~/code/yb-kafka-connector/target/yb-kafka-connnector-1.0.0.jar ~/code/confluent-os/confluent-5.0.0/share/java/kafka-connect-yugabyte/```
27+
- Copy all the dependent jars from Cassandra into the same folder. Most of these can be downloaded from maven central repository. The final list of jars should look like this:
28+
```-rw-r--r--@ 85449 Oct 27 2013 metrics-core-3.0.1.jar
29+
-rw-r--r-- 1082699 Oct 26 00:16 cassandra-driver-core-3.2.0.jar
30+
-rw-r--r--@ 3823147 Oct 27 15:18 netty-all-4.1.25.Final.jar
31+
-rw-r--r-- 13917 Oct 27 15:41 yb-kafka-connnector-1.0.0.jar
32+
```
33+
34+
4. Do the following to run Kafka and related dependencies:
35+
export PATH=$PATH:~/code/confluent-os/confluent-5.0.0/bin
36+
confluent start
37+
confluent stop connect
38+
confluent stop kafka-rest
39+
confluent status
40+
41+
The output for this should look like
42+
```
43+
control-center is [UP]
44+
ksql-server is [UP]
45+
connect is [DOWN]
46+
kafka-rest is [DOWN]
47+
schema-registry is [UP]
48+
kafka is [UP]
49+
zookeeper is [UP]
50+
```
51+
52+
5. Install YugaByte DB and create the keyspace/table.
53+
- [Install YugaByte DB and start a local cluster](https://docs.yugabyte.com/quick-start/install/).
54+
- Create the keyspace and table by running the following command. You can find `cqlsh` in the `bin` sub-directory located inside the YugaByte installation folder.
55+
```sh
56+
cqlsh -f resources/examples/table.cql
57+
```
58+
59+
6. Create a topic (one time)
60+
```~/code/confluent-os/confluent-5.0.0/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic iot-data-event```
61+
and then run the sample Kafka producer java app (need to tweak this to not depend on the https://github.com/YugaByte/yb-iot-fleet-management repo):
62+
cd ~/code/yb-iot-fleet-management; java -jar iot-kafka-producer/target/iot-kafka-producer-1.0.0.jar
63+
64+
7. Run the Kafka Connect in standalone mode
65+
```cd ~/code/confluent-os/confluent-5.0.0; ./bin/connect-standalone ./etc/kafka/connect-standalone.properties ./etc/kafka-connect-yugabyte/sink.properties```
66+
Note: In the connect-standalone need to set *.enable.schema to false.
67+
68+
You should see something like this (relevant lines from YBSinkTask.java):
69+
```
70+
[2018-10-28 16:24:16,037] INFO START iotdemo ingress (com.yb.connect.sink.YBSinkTask:69)
71+
[2018-10-28 16:24:16,054] INFO Connecting to nodes: /127.0.0.1:9042,/127.0.0.2:9042,/127.0.0.3:9042 (com.yb.connect.sink.YBSinkTask:149)
72+
[2018-10-28 16:24:16,517] INFO Connected to cluster: cluster1 (com.yb.connect.sink.YBSinkTask:155)
73+
[2018-10-28 16:24:16,594] INFO Processing 1 records from Kafka. (com.yb.connect.sink.YBSinkTask:419)
74+
[2018-10-28 16:24:16,612] INFO Prepare SinkRecord ...
75+
[2018-10-28 16:24:16,618] INFO Bind timestamp of type timestamp (com.yb.connect.sink.YBSinkTask:204)
76+
...
77+
```
78+
79+
8. Confirm that the rows are in the table using cqlsh.
80+
```cqlsh> select * from iotdemo.ingress limit 5;
81+
82+
vehicleid | routeid | vehicletype | longitude | latitude | timestamp | speed | fuellevel
83+
--------------------------------------+----------+-------------+------------+-----------+---------------------------------+-------+-----------
84+
cc83f207-f233-49f3-8474-3b5f88379b93 | Route-43 | Small Truck | -97.978294 | 35.816948 | 2018-10-28 16:24:51.000000+0000 | 45 | 37
85+
0cb3908c-34a2-4642-8a22-9c330030d0d3 | Route-43 | Large Truck | -97.32471 | 35.677494 | 2018-10-28 16:24:51.000000+0000 | 68 | 35
86+
6c6083c6-d05d-48a9-8119-10d7b348272d | Route-82 | 18 Wheeler | -96.268425 | 34.652107 | 2018-10-28 16:24:51.000000+0000 | 75 | 13
87+
d7f2f71a-4347-46a3-aa28-c4048328e7f5 | Route-82 | Large Truck | -96.2321 | 34.241295 | 2018-10-28 16:24:51.000000+0000 | 77 | 37
88+
7af07ac6-0902-42ee-ad1c-657e96473dac | Route-37 | Small Truck | -95.0934 | 33.29403 | 2018-10-28 15:17:55.000000+0000 | 89 | 22
89+
90+
(5 rows)
91+
cqlsh> select count(*) from iotdemo.ingress;
92+
...
93+
```
94+
95+
## Future Work
96+
- Add more data types.
97+
- Add more tests.
98+
- Add restartability.
99+
- Add YugaByte DB as a Connect Source.
100+
- Update README!
101+
102+
## License
103+
This software is distributed under an Apache 2.0 license. See the [LICENSE.txt](https://github.com/YugaByte/yb-kafka-connector/LICENSE) file for details.

pom.xml

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
// Copyright (c) YugaByte, Inc.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6+
// in compliance with the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software distributed under the License
11+
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
// or implied. See the License for the specific language governing permissions and limitations
13+
// under the License.
14+
-->
15+
16+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
17+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
18+
<modelVersion>4.0.0</modelVersion>
19+
<groupId>com.yb</groupId>
20+
<artifactId>yb-kafka-connnector</artifactId>
21+
<version>1.0.0</version>
22+
<name>Kafka Connect YB</name>
23+
24+
<description>
25+
Kafka Connect to YugaByte DB.
26+
</description>
27+
28+
29+
<dependencies>
30+
<!-- Kafka -->
31+
<dependency>
32+
<groupId>org.apache.kafka</groupId>
33+
<artifactId>kafka-streams</artifactId>
34+
<version>0.10.1.1</version>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.apache.kafka</groupId>
38+
<artifactId>connect-api</artifactId>
39+
<version>2.0.0</version>
40+
</dependency>
41+
42+
<!-- Cassandra -->
43+
<dependency>
44+
<groupId>com.datastax.cassandra</groupId>
45+
<artifactId>cassandra-driver-core</artifactId>
46+
<version>3.2.0</version>
47+
</dependency>
48+
49+
<!-- other -->
50+
<dependency>
51+
<groupId>log4j</groupId>
52+
<artifactId>log4j</artifactId>
53+
<version>1.2.17</version>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.slf4j</groupId>
57+
<artifactId>slf4j-log4j12</artifactId>
58+
<version>1.7.25</version>
59+
</dependency>
60+
<dependency>
61+
<groupId>junit</groupId>
62+
<artifactId>junit</artifactId>
63+
<version>4.12</version>
64+
</dependency>
65+
</dependencies>
66+
67+
<build>
68+
<plugins>
69+
<plugin>
70+
<groupId>org.apache.maven.plugins</groupId>
71+
<artifactId>maven-compiler-plugin</artifactId>
72+
<version>3.3</version>
73+
<configuration>
74+
<source>1.8</source>
75+
<target>1.8</target>
76+
</configuration>
77+
</plugin>
78+
</plugins>
79+
</build>
80+
81+
</project>

resources/examples/kafka.connector

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
name=yugabyte-sink
2+
connector.class=com.yb.connect.sink.YBSinkConnector
3+
tasks.max=1
4+
topics=iot-data-event
5+
yugabyte.cql.keyspace=iotdemo
6+
yugabyte.cql.tablename=ingress
7+
yugabyte.cql.contact.points=127.0.0.1:9042,127.0.0.2:9042,127.0.0.3:9042
8+

resources/examples/table.cql

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
DROP TABLE iotdemo.ingress;
2+
DROP KEYSPACE iotdemo;
3+
4+
// Create keyspace
5+
CREATE KEYSPACE IF NOT EXISTS iotdemo WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
6+
7+
// Create table
8+
CREATE TABLE iotdemo.ingress (vehicleId text, routeId text, vehicleType text, longitude text, latitude text, timeStamp timestamp, speed double, fuelLevel double, PRIMARY KEY (vehicleId));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright (c) YugaByte, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4+
// in compliance with the License. You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software distributed under the License
9+
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10+
// or implied. See the License for the specific language governing permissions and limitations
11+
// under the License.
12+
//
13+
14+
package com.yb.connect.sink;
15+
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.stream.IntStream;
20+
21+
import org.apache.kafka.common.config.ConfigDef;
22+
import org.apache.kafka.connect.connector.Task;
23+
import org.apache.kafka.connect.sink.SinkConnector;
24+
25+
/**
26+
* YugaByte Sink connector.
27+
*/
28+
public class YBSinkConnector extends SinkConnector {
29+
private static final String VERSION = "1";
30+
31+
private Map<String, String> properties;
32+
33+
@Override
34+
public String version() {
35+
return VERSION;
36+
}
37+
38+
@Override
39+
public void start(final Map<String, String> properties) {
40+
this.properties = properties;
41+
}
42+
43+
@Override
44+
public Class<? extends Task> taskClass() {
45+
return YBSinkTask.class;
46+
}
47+
48+
@Override
49+
public List<Map<String, String>> taskConfigs(final int maximumTasks) {
50+
final List<Map<String, String>> configs = new ArrayList<Map<String, String>>();
51+
IntStream.rangeClosed(1, maximumTasks).forEach((i) -> configs.add(properties));
52+
return configs;
53+
}
54+
55+
@Override
56+
public void stop() {
57+
}
58+
59+
@Override
60+
public ConfigDef config() {
61+
return new ConfigDef();
62+
}
63+
}

0 commit comments

Comments
 (0)