|
24 | 24 | import org.apache.iotdb.rpc.IoTDBConnectionException; |
25 | 25 | import org.apache.iotdb.rpc.StatementExecutionException; |
26 | 26 | import org.apache.iotdb.session.pool.TableSessionPoolBuilder; |
| 27 | + |
27 | 28 | import org.apache.kafka.clients.consumer.ConsumerConfig; |
28 | 29 | import org.apache.kafka.clients.consumer.KafkaConsumer; |
29 | 30 | import org.apache.kafka.common.serialization.StringDeserializer; |
30 | 31 | import org.slf4j.Logger; |
31 | 32 | import org.slf4j.LoggerFactory; |
32 | 33 |
|
33 | | -import java.util.*; |
| 34 | +import java.util.ArrayList; |
| 35 | +import java.util.Arrays; |
| 36 | +import java.util.Collections; |
| 37 | +import java.util.List; |
| 38 | +import java.util.Properties; |
34 | 39 | import java.util.concurrent.ExecutorService; |
35 | 40 | import java.util.concurrent.Executors; |
36 | 41 |
|
37 | 42 | public class RelationalConsumer { |
38 | 43 |
|
39 | | - private static final Logger LOGGER = LoggerFactory.getLogger(RelationalConsumer.class); |
40 | | - private static ITableSessionPool tableSessionPool; |
41 | | - private List<KafkaConsumer<String, String>> consumerList; |
42 | | - |
43 | | - private RelationalConsumer(List<KafkaConsumer<String, String>> consumerList) { |
44 | | - this.consumerList = consumerList; |
45 | | - initSessionPool(); |
| 44 | + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalConsumer.class); |
| 45 | + private static ITableSessionPool tableSessionPool; |
| 46 | + private List<KafkaConsumer<String, String>> consumerList; |
| 47 | + |
| 48 | + private RelationalConsumer(List<KafkaConsumer<String, String>> consumerList) { |
| 49 | + this.consumerList = consumerList; |
| 50 | + initSessionPool(); |
| 51 | + } |
| 52 | + |
| 53 | + private static void initSessionPool() { |
| 54 | + tableSessionPool = |
| 55 | + new TableSessionPoolBuilder() |
| 56 | + .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS)) |
| 57 | + .user(RelationalConstant.IOTDB_USERNAME) |
| 58 | + .password(RelationalConstant.IOTDB_PASSWORD) |
| 59 | + .maxSize(RelationalConstant.SESSION_SIZE) |
| 60 | + .build(); |
| 61 | + } |
| 62 | + |
| 63 | + public static void main(String[] args) { |
| 64 | + List<KafkaConsumer<String, String>> consumerList = new ArrayList<>(); |
| 65 | + for (int i = 0; i < RelationalConstant.CONSUMER_THREAD_NUM; i++) { |
| 66 | + Properties props = new Properties(); |
| 67 | + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, RelationalConstant.KAFKA_SERVICE_URL); |
| 68 | + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| 69 | + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| 70 | + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| 71 | + props.put(ConsumerConfig.GROUP_ID_CONFIG, RelationalConstant.TOPIC); |
| 72 | + |
| 73 | + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); |
| 74 | + consumerList.add(consumer); |
| 75 | + consumer.subscribe(Collections.singleton(RelationalConstant.TOPIC)); |
46 | 76 | } |
47 | | - |
48 | | - private static void initSessionPool() { |
49 | | - tableSessionPool = |
50 | | - new TableSessionPoolBuilder() |
51 | | - .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS)) |
52 | | - .user(RelationalConstant.IOTDB_USERNAME) |
53 | | - .password(RelationalConstant.IOTDB_PASSWORD) |
54 | | - .maxSize(RelationalConstant.SESSION_SIZE) |
55 | | - .build(); |
56 | | - } |
57 | | - |
58 | | - public static void main(String[] args) { |
59 | | - List<KafkaConsumer<String, String>> consumerList = new ArrayList<>(); |
60 | | - for (int i = 0; i < RelationalConstant.CONSUMER_THREAD_NUM; i++) { |
61 | | - Properties props = new Properties(); |
62 | | - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, RelationalConstant.KAFKA_SERVICE_URL); |
63 | | - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
64 | | - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
65 | | - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
66 | | - props.put(ConsumerConfig.GROUP_ID_CONFIG, RelationalConstant.TOPIC); |
67 | | - |
68 | | - KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); |
69 | | - consumerList.add(consumer); |
70 | | - consumer.subscribe(Collections.singleton(RelationalConstant.TOPIC)); |
71 | | - } |
72 | | - RelationalConsumer consumer = new RelationalConsumer(consumerList); |
73 | | - initIoTDB(); |
74 | | - consumer.consumeInParallel(); |
| 77 | + RelationalConsumer consumer = new RelationalConsumer(consumerList); |
| 78 | + initIoTDB(); |
| 79 | + consumer.consumeInParallel(); |
| 80 | + } |
| 81 | + |
| 82 | + private static void initIoTDB() { |
| 83 | + for (String db : RelationalConstant.DATABASES) { |
| 84 | + boolean res = createDatabase(db); |
| 85 | + if (!res) { |
| 86 | + throw new RuntimeException("Create database failed"); |
| 87 | + } |
75 | 88 | } |
76 | | - |
77 | | - private static void initIoTDB() { |
78 | | - for (String db : RelationalConstant.DATABASES) { |
79 | | - boolean res = createDatabase(db); |
80 | | - if (!res) { |
81 | | - throw new RuntimeException("Create database failed"); |
82 | | - } |
83 | | - } |
84 | | - for (String[] tableInfo : RelationalConstant.TABLES) { |
85 | | - boolean res = createTable(tableInfo); |
86 | | - if (!res) { |
87 | | - throw new RuntimeException("Create table failed"); |
88 | | - } |
89 | | - } |
| 89 | + for (String[] tableInfo : RelationalConstant.TABLES) { |
| 90 | + boolean res = createTable(tableInfo); |
| 91 | + if (!res) { |
| 92 | + throw new RuntimeException("Create table failed"); |
| 93 | + } |
90 | 94 | } |
91 | | - |
92 | | - private static boolean createDatabase(String dbName) { |
93 | | - try (ITableSession session = tableSessionPool.getSession()) { |
94 | | - try { |
95 | | - session.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName)); |
96 | | - } catch (IoTDBConnectionException | StatementExecutionException e) { |
97 | | - LOGGER.error("Create Database Error: ", e); |
98 | | - return false; |
99 | | - } |
100 | | - } catch (IoTDBConnectionException e) { |
101 | | - LOGGER.error("Get Table Session Error: ", e); |
102 | | - return false; |
103 | | - } |
104 | | - return true; |
| 95 | + } |
| 96 | + |
| 97 | + private static boolean createDatabase(String dbName) { |
| 98 | + try (ITableSession session = tableSessionPool.getSession()) { |
| 99 | + try { |
| 100 | + session.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName)); |
| 101 | + } catch (IoTDBConnectionException | StatementExecutionException e) { |
| 102 | + LOGGER.error("Create Database Error: ", e); |
| 103 | + return false; |
| 104 | + } |
| 105 | + } catch (IoTDBConnectionException e) { |
| 106 | + LOGGER.error("Get Table Session Error: ", e); |
| 107 | + return false; |
105 | 108 | } |
106 | | - |
107 | | - private static boolean createTable(String[] tableInfo) { |
108 | | - try (ITableSession session = tableSessionPool.getSession()) { |
109 | | - String sql = getCreateTableSQL(tableInfo); |
110 | | - try { |
111 | | - session.executeNonQueryStatement(sql); |
112 | | - } catch (IoTDBConnectionException | StatementExecutionException e) { |
113 | | - LOGGER.error("Create Table Error: ", e); |
114 | | - return false; |
115 | | - } |
116 | | - } catch (IoTDBConnectionException e) { |
117 | | - LOGGER.error("Get Table Session Error: ", e); |
118 | | - return false; |
119 | | - } |
120 | | - return true; |
| 109 | + return true; |
| 110 | + } |
| 111 | + |
| 112 | + private static boolean createTable(String[] tableInfo) { |
| 113 | + try (ITableSession session = tableSessionPool.getSession()) { |
| 114 | + String sql = getCreateTableSQL(tableInfo); |
| 115 | + try { |
| 116 | + session.executeNonQueryStatement(sql); |
| 117 | + } catch (IoTDBConnectionException | StatementExecutionException e) { |
| 118 | + LOGGER.error("Create Table Error: ", e); |
| 119 | + return false; |
| 120 | + } |
| 121 | + } catch (IoTDBConnectionException e) { |
| 122 | + LOGGER.error("Get Table Session Error: ", e); |
| 123 | + return false; |
121 | 124 | } |
122 | | - |
123 | | - private static String getCreateTableSQL(String[] tableInfo) { |
124 | | - StringBuilder sql = new StringBuilder(); |
125 | | - sql.append("CREATE TABLE \"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" ("); |
126 | | - |
127 | | - String[] columnNames = tableInfo[2].split(","); |
128 | | - String[] columnTypes = tableInfo[3].split(","); |
129 | | - String[] columnCategories = tableInfo[4].split(","); |
130 | | - int columnSize = columnNames.length; |
131 | | - |
132 | | - for (int i = 0; i < columnSize; i++) { |
133 | | - sql.append(columnNames[i]).append(" "); |
134 | | - sql.append(columnTypes[i]).append(" "); |
135 | | - sql.append(columnCategories[i]).append(","); |
136 | | - } |
137 | | - sql.deleteCharAt(sql.length() - 1); |
138 | | - sql.append(")"); |
139 | | - return sql.toString(); |
| 125 | + return true; |
| 126 | + } |
| 127 | + |
| 128 | + private static String getCreateTableSQL(String[] tableInfo) { |
| 129 | + StringBuilder sql = new StringBuilder(); |
| 130 | + sql.append("CREATE TABLE \"") |
| 131 | + .append(tableInfo[0]) |
| 132 | + .append("\".\"") |
| 133 | + .append(tableInfo[1]) |
| 134 | + .append("\" ("); |
| 135 | + |
| 136 | + String[] columnNames = tableInfo[2].split(","); |
| 137 | + String[] columnTypes = tableInfo[3].split(","); |
| 138 | + String[] columnCategories = tableInfo[4].split(","); |
| 139 | + int columnSize = columnNames.length; |
| 140 | + |
| 141 | + for (int i = 0; i < columnSize; i++) { |
| 142 | + sql.append(columnNames[i]).append(" "); |
| 143 | + sql.append(columnTypes[i]).append(" "); |
| 144 | + sql.append(columnCategories[i]).append(","); |
140 | 145 | } |
141 | | - |
142 | | - private void consumeInParallel() { |
143 | | - ExecutorService executor = Executors.newFixedThreadPool(RelationalConstant.CONSUMER_THREAD_NUM); |
144 | | - for (int i = 0; i < consumerList.size(); i++) { |
145 | | - RelationalConsumerThread consumerThread = new RelationalConsumerThread(consumerList.get(i), tableSessionPool); |
146 | | - executor.submit(consumerThread); |
147 | | - } |
| 146 | + sql.deleteCharAt(sql.length() - 1); |
| 147 | + sql.append(")"); |
| 148 | + return sql.toString(); |
| 149 | + } |
| 150 | + |
| 151 | + private void consumeInParallel() { |
| 152 | + ExecutorService executor = Executors.newFixedThreadPool(RelationalConstant.CONSUMER_THREAD_NUM); |
| 153 | + for (int i = 0; i < consumerList.size(); i++) { |
| 154 | + RelationalConsumerThread consumerThread = |
| 155 | + new RelationalConsumerThread(consumerList.get(i), tableSessionPool); |
| 156 | + executor.submit(consumerThread); |
148 | 157 | } |
| 158 | + } |
149 | 159 | } |
0 commit comments