Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 95 additions & 35 deletions examples/kafka/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,51 +25,111 @@ The example is to show how to send data from localhost to IoTDB through Kafka.
```
## Usage
### Version usage
IoTDB: 1.0.0
Kafka: 2.8.0

| | Version |
|-------|---------|
| IoTDB | 2.0.5 |
| Kafka | 2.8.2 |

### Dependencies with Maven

```
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>2.0.5</version>
</dependency>
</dependencies>
```

### Launch the servers
### Prerequisite Steps

```
 Before you run the program, make sure you have launched the servers of Kafka and IoTDB.
 For details, please refer to http://kafka.apache.org/081/documentation.html#quickstart
```
#### 1. Install IoTDB
please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/#/Download)

### Run Producer.java
#### 2. Install Kafka
please refer to [https://kafka.apache.org/downloads](https://kafka.apache.org/downloads)

```
The class is to send data from localhost to Kafka clusters.
 Firstly, you have to change the parameter of TOPIC in Constant.java to what you create:(for example : "Kafka-Test")
 > public final static String TOPIC = "Kafka-Test";
 The default format of data is "device,timestamp,value ". (for example : "measurement1,2017/10/24 19:30:00,60")
Then you need to create data in Constat.ALL_DATA
 Finally, run KafkaProducer.java
```
#### 3. Startup IoTDB
please refer to [Quick Start](https://iotdb.apache.org/UserGuide/latest/QuickStart/QuickStart_apache.html)

### Run Consumer.java
#### 4. Startup Kafka
please refer to [https://kafka.apache.org/quickstart](https://kafka.apache.org/quickstart)


### Case 1: Send data from localhost to IoTDB-tree

Files related:
1. `Constant.java` : configuration of IoTDB and Kafka
2. `Producer.java` : send data from localhost to Kafka cluster
3. `Consumer.java` : consume data from Kafka cluster through multi-threads
4. `ConsumerThread.java` : consume operations done by single thread

Step 0: Set parameter in `Constant.java`

> Change the parameters according to your situation.

| Parameter | Data Type | Description |
|---------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| TOPIC | String | The topic to store data in Kafka |
| KAFKA_SERVICE_URL | String | The service url of Kafka, e.g. `"127.0.0.1:9092"` |
| CONSUMER_THREAD_NUM | int | The number of consumer threads |
| SESSION_SIZE | int | The maximum number of IoTDB sessions |
| IOTDB_CONNECTION_HOST | String | IoTDB host, e.g. `"localhost"` |
| IOTDB_CONNECTION_PORT | int | IoTDB port, e.g. `6667` |
| IOTDB_CONNECTION_USER | String | IoTDB username, e.g. `"root"` |
| IOTDB_CONNECTION_PASSWORD | String | IoTDB password, e.g. `"root"` |
| STORAGE_GROUP | Array | The storage groups to create |
| CREATE_TIMESERIES | Array | The timeseries to create <br/> Format of a single timeseries: {"timeseries", "dataType", "encodingType", "compressionType"} <br/> e.g. `{"root.vehicle.d0.s0", "INT32", "PLAIN", "SNAPPY"}` |
| ALL_DATA | Array | The data to create <br/> Format of a single data: "device,timestamp,fieldName\[:fieldName\]\*,dataType\[:dataType\]\*,value\[:value\]\*" <br/> e.g. `"root.vehicle.d0,10,s0,INT32,100"`, `"root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'"` |

Step 1: Run `Producer.java`

> This class sends data from localhost to Kafka clusters. <br/>

Step 2: Run `Consumer.java`

> This class consumes data from Kafka through multi-threads and sends the data to IoTDB-tree.

### Case 2: Send data from localhost to IoTDB-table

Files related:
1. `RelationalConstant.java` : configuration of IoTDB and Kafka
2. `RelationalProducer.java` : send data from localhost to Kafka cluster
3. `RelationalConsumer.java` : consume data from Kafka cluster through multi-threads
4. `RelationalConsumerThread.java` : consume operations done by single thread

Step 0: Set parameter in `RelationalConstant.java`

> Change the parameters according to your situation.

| Parameter | Data Type | Description |
|---------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| TOPIC | String | The topic to store data in Kafka |
| KAFKA_SERVICE_URL | String | The service url of Kafka, e.g. `"127.0.0.1:9092"` |
| CONSUMER_THREAD_NUM | int | The number of consumer threads |
| SESSION_SIZE | int | The maximum number of IoTDB sessions |
| IOTDB_URLS | Array | IoTDB urls, e.g. `{"localhost:6667"}` |
| IOTDB_USERNAME | String | IoTDB username, e.g. `"root"` |
| IOTDB_PASSWORD | String | IoTDB password, e.g. `"root"` |
| DATABASES | Array | The databases to create |
| TABLES | Array | The tables to create <br/> Format of a single table: {"database", "tableName", "columnNames", "columnTypes", "columnCategories"} <br/> e.g. `{"kafka_db1", "tb1", "time,region,status", "TIMESTAMP,STRING,BOOLEAN", "TIME,TAG,FIELD"}` |
| ALL_DATA | Array | The data to create <br/> Format of a single data: "database;tableName;columnName\[,columnName\]\*;value\[,value\]\*\[;value\[,value\]\*\]\*" <br/> e.g. `"kafka_db1;tb1;time,status;17,true;18,false;19,true"` |

Step 1: Run `RelationalProducer.java`

> This class sends data from localhost to Kafka clusters.

Step 2: Run `RelationalConsumer.java`

> This class consumes data from Kafka through multi-threads and sends the data to IoTDB-table.

```
The class is to show how to consume data from kafka through multi-threads.
The data is sent by class KafkaProducer.
 You can set the parameter of CONSUMER_THREAD_NUM in Constant.java to make sure the number of consumer threads:(for example: "3")
> private final static int CONSUMER_THREAD_NUM = 3;
```

#### Notice
 If you want to use multiple consumers, please make sure that the number of topic's partition you create is more than 1.
### Notice
If you want to use multiple consumers, please make sure that the number of topic's partition you create is more than 1.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.kafka.relational;

public class RelationalConstant {

public static final String KAFKA_SERVICE_URL = "172.20.31.71:9094";
public static final String TOPIC = "Kafka-Relational-Test";
public static final String[] IOTDB_URLS = {
"127.0.0.1:6667"
};
public static final String IOTDB_USERNAME = "root";
public static final String IOTDB_PASSWORD = "root";
public static final int SESSION_SIZE = 3;
public static final int CONSUMER_THREAD_NUM = 5;
public static final String[] DATABASES = {"kafka_db1", "kafka_db2"};
public static final String[][] TABLES = {
// database, tableName, columnNames, columnTypes, columnCategories
{"kafka_db1", "tb1", "time,region,model_id,temperature,status", "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"},
{"kafka_db2", "tb2", "time,plant_id,humidity,status", "TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"}
};
public static final String[] ALL_DATA = {
// database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
"kafka_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
"kafka_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
"kafka_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
"kafka_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
"kafka_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
"kafka_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
"kafka_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.kafka.relational;

import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.pool.ITableSessionPool;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.TableSessionPoolBuilder;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RelationalConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(RelationalConsumer.class);
private static ITableSessionPool tableSessionPool;
private List<KafkaConsumer<String, String>> consumerList;

private RelationalConsumer(List<KafkaConsumer<String, String>> consumerList) {
this.consumerList = consumerList;
initSessionPool();
}

private static void initSessionPool() {
tableSessionPool =
new TableSessionPoolBuilder()
.nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
.user(RelationalConstant.IOTDB_USERNAME)
.password(RelationalConstant.IOTDB_PASSWORD)
.maxSize(RelationalConstant.SESSION_SIZE)
.build();
}

public static void main(String[] args) {
List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
for (int i = 0; i < RelationalConstant.CONSUMER_THREAD_NUM; i++) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, RelationalConstant.KAFKA_SERVICE_URL);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, RelationalConstant.TOPIC);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumerList.add(consumer);
consumer.subscribe(Collections.singleton(RelationalConstant.TOPIC));
}
RelationalConsumer consumer = new RelationalConsumer(consumerList);
initIoTDB();
consumer.consumeInParallel();
}

private static void initIoTDB() {
for (String db : RelationalConstant.DATABASES) {
boolean res = createDatabase(db);
if (!res) {
throw new RuntimeException("Create database failed");
}
}
for (String[] tableInfo : RelationalConstant.TABLES) {
boolean res = createTable(tableInfo);
if (!res) {
throw new RuntimeException("Create table failed");
}
}
}

private static boolean createDatabase(String dbName) {
try (ITableSession session = tableSessionPool.getSession()) {
try {
session.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName));
} catch (IoTDBConnectionException | StatementExecutionException e) {
LOGGER.error("Create Database Error: ", e);
return false;
}
} catch (IoTDBConnectionException e) {
LOGGER.error("Get Table Session Error: ", e);
return false;
}
return true;
}

private static boolean createTable(String[] tableInfo) {
try (ITableSession session = tableSessionPool.getSession()) {
String sql = getCreateTableSQL(tableInfo);
try {
session.executeNonQueryStatement(sql);
} catch (IoTDBConnectionException | StatementExecutionException e) {
LOGGER.error("Create Table Error: ", e);
return false;
}
} catch (IoTDBConnectionException e) {
LOGGER.error("Get Table Session Error: ", e);
return false;
}
return true;
}

private static String getCreateTableSQL(String[] tableInfo) {
StringBuilder sql = new StringBuilder();
sql.append("CREATE TABLE \"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" (");

String[] columnNames = tableInfo[2].split(",");
String[] columnTypes = tableInfo[3].split(",");
String[] columnCategories = tableInfo[4].split(",");
int columnSize = columnNames.length;

for (int i = 0; i < columnSize; i++) {
sql.append(columnNames[i]).append(" ");
sql.append(columnTypes[i]).append(" ");
sql.append(columnCategories[i]).append(",");
}
sql.deleteCharAt(sql.length() - 1);
sql.append(")");
return sql.toString();
}

private void consumeInParallel() {
ExecutorService executor = Executors.newFixedThreadPool(RelationalConstant.CONSUMER_THREAD_NUM);
for (int i = 0; i < consumerList.size(); i++) {
RelationalConsumerThread consumerThread = new RelationalConsumerThread(consumerList.get(i), tableSessionPool);
executor.submit(consumerThread);
}
}
}
Loading