Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
Expand All @@ -45,11 +46,14 @@
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.File;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -62,8 +66,25 @@
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT1.class})
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {

@Before
public void setUp() {
MultiEnvFactory.createEnv(1);
env = MultiEnvFactory.getEnv(0);
env.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setPipeMemoryManagementEnabled(false)
.setDataReplicationFactor(1)
.setSchemaReplicationFactor(1)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
env.initClusterEnvironment(1, 1);
}

@Test
public void testOPCUAServerSink() throws Exception {
int tcpPort = -1;
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {

Expand All @@ -75,44 +96,82 @@ public void testOPCUAServerSink() throws Exception {
sinkAttributes.put("opcua.model", "client-server");
sinkAttributes.put("security-policy", "None");

final int[] ports = EnvUtils.searchAvailablePorts();
final int tcpPort = ports[0];
final int httpsPort = ports[1];
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("https.port", Integer.toString(httpsPort));
OpcUaClient opcUaClient;
DataValue value;
while (true) {
final int[] ports = EnvUtils.searchAvailablePorts();
tcpPort = ports[0];
final int httpsPort = ports[1];
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("https.port", Integer.toString(httpsPort));

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.singletonMap("user", "root"))
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.singletonMap("user", "root"))
.setProcessorAttributes(Collections.emptyMap()))
.getCode());

final OpcUaClient opcUaClient =
getOpcUaClient(
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
DataValue value =
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
try {
opcUaClient =
getOpcUaClient(
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
} catch (final PipeException e) {
if (e.getCause() instanceof ConnectException) {
continue;
} else {
throw e;
}
}
value =
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
opcUaClient.disconnect().get();
break;
}

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.alterPipe(
new TAlterPipeReq()
.setPipeName("testPipe")
.setIsReplaceAllConnectorAttributes(false)
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
.setProcessorAttributes(Collections.emptyMap())
.setExtractorAttributes(Collections.emptyMap()))
.getCode());
while (true) {
final int[] ports = EnvUtils.searchAvailablePorts();
tcpPort = ports[0];
final int httpsPort = ports[1];
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("https.port", Integer.toString(httpsPort));
sinkAttributes.put("with-quality", "true");

TestUtils.executeNonQuery(
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.alterPipe(
new TAlterPipeReq()
.setPipeName("testPipe")
.setIsReplaceAllConnectorAttributes(true)
.setConnectorAttributes(sinkAttributes)
.setProcessorAttributes(Collections.emptyMap())
.setExtractorAttributes(Collections.emptyMap()))
.getCode());
try {
opcUaClient =
getOpcUaClient(
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
} catch (final PipeException e) {
if (e.getCause() instanceof ConnectException) {
continue;
} else {
throw e;
}
}
break;
}

// Create aligned timeSeries to avoid tsFile parsing
TestUtils.executeNonQueries(
env,
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
Arrays.asList(
"create aligned timeSeries root.db.opc(value double, quality boolean, other int32)",
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)"),
null);

long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -168,11 +227,12 @@ public void testOPCUAServerSink() throws Exception {
.getCode());

// Banned none, only allows basic256sha256
final int finalTcpPort = tcpPort;
Assert.assertThrows(
PipeException.class,
() ->
getOpcUaClient(
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
"opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb",
SecurityPolicy.None,
"root",
"root"));
Expand All @@ -188,6 +248,13 @@ public void testOPCUAServerSink() throws Exception {
"org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port 12686 and https port 8443's password **** conflicts to the new password ****, reject reusing.",
e.getMessage());
}
} finally {
if (tcpPort >= 0) {
final String lockPath = EnvUtils.getLockFilePath(tcpPort);
if (!new File(lockPath).delete()) {
System.out.printf("Delete lock file %s failed%n", lockPath);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ public void run() {
configurableUaClient.run(client);
} catch (final Exception e) {
throw new PipeException(
"Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
"Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e);
}
} catch (final Exception e) {
throw new PipeException(
"Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
"Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e);
}
}
}