diff --git a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java index 5986e804..74b3ad0c 100644 --- a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java +++ b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java @@ -77,8 +77,15 @@ public void open(Configuration parameters) throws Exception { } void initSession() { - pool = - new SessionPool( + if (options.getNodeUrls() != null) { + pool = new SessionPool( + options.getNodeUrls(), + options.getUser(), + options.getPassword(), + sessionPoolSize); + return; + } + pool = new SessionPool( options.getHost(), options.getPort(), options.getUser(), diff --git a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java index f62d7490..57c81f6a 100644 --- a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java +++ b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java @@ -19,6 +19,7 @@ package org.apache.iotdb.flink.options; import java.io.Serializable; +import java.util.List; public class IoTDBOptions implements Serializable { @@ -26,6 +27,7 @@ public class IoTDBOptions implements Serializable { protected int port; protected String user; protected String password; + protected List nodeUrls; public IoTDBOptions(String host, int port, String user, String password) { this.host = host; @@ -34,6 +36,12 @@ public IoTDBOptions(String host, int port, String user, String password) { this.password = password; } + public IoTDBOptions(List nodeUrls, String user, String password) { + this.nodeUrls = nodeUrls; + this.user = user; + this.password = password; + } + public IoTDBOptions() {} public String getHost() { @@ -67,4 +75,12 @@ public String getPassword() { public void setPassword(String password) { this.password = password; } + + public List getNodeUrls() { + return nodeUrls; + } + + public void setNodeUrls(List nodeUrls) { + this.nodeUrls = nodeUrls; + } } diff --git a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java index c2a6917b..347eee95 100644 --- a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java +++ b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java @@ -42,6 +42,15 @@ public IoTDBSinkOptions( this.timeseriesOptionList = timeseriesOptionList; } + public IoTDBSinkOptions( + List nodeUrls, + String user, + String password, + List timeseriesOptionList) { + super(nodeUrls, user, password); + this.timeseriesOptionList = timeseriesOptionList; + } + public List getTimeseriesOptionList() { return timeseriesOptionList; } diff --git a/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java b/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java index bd592d28..0b3f1de2 100644 --- a/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java +++ b/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java @@ -25,10 +25,12 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -113,4 +115,13 @@ public void close() throws Exception { any(List.class), any(List.class), any(List.class), any(List.class), any(List.class)); verify(pool).close(); } + + @Test + public void testInitUrlNodes() { + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.2:6667"); + IoTDBSinkOptions sinkOptions = new IoTDBSinkOptions(nodeUrls, null, null, null); + assertEquals(sinkOptions.getNodeUrls(), nodeUrls); + } }