diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 4c04b793197b..d929f747ebb8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner; import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient; +import org.apache.iotdb.it.env.MultiEnvFactory; import org.apache.iotdb.it.env.cluster.EnvUtils; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT1; @@ -45,11 +46,14 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import java.io.File; +import java.net.ConnectException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -62,8 +66,25 @@ @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT1.class}) public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT { + + @Before + public void setUp() { + MultiEnvFactory.createEnv(1); + env = MultiEnvFactory.getEnv(0); + env.getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setPipeMemoryManagementEnabled(false) + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1) + .setIsPipeEnableMemoryCheck(false) + .setPipeAutoSplitFullEnabled(false); + env.initClusterEnvironment(1, 1); + } + @Test public void testOPCUAServerSink() throws Exception { + int tcpPort = -1; try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) { @@ -75,44 +96,82 @@ public void testOPCUAServerSink() throws Exception { sinkAttributes.put("opcua.model", "client-server"); sinkAttributes.put("security-policy", "None"); - final int[] ports = EnvUtils.searchAvailablePorts(); - final int tcpPort = ports[0]; - final int httpsPort = ports[1]; - sinkAttributes.put("tcp.port", Integer.toString(tcpPort)); - sinkAttributes.put("https.port", Integer.toString(httpsPort)); + OpcUaClient opcUaClient; + DataValue value; + while (true) { + final int[] ports = EnvUtils.searchAvailablePorts(); + tcpPort = ports[0]; + final int httpsPort = ports[1]; + sinkAttributes.put("tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("https.port", Integer.toString(httpsPort)); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - client - .createPipe( - new TCreatePipeReq("testPipe", sinkAttributes) - .setExtractorAttributes(Collections.singletonMap("user", "root")) - .setProcessorAttributes(Collections.emptyMap())) - .getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(Collections.singletonMap("user", "root")) + .setProcessorAttributes(Collections.emptyMap())) + .getCode()); - final OpcUaClient opcUaClient = - getOpcUaClient( - "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); - DataValue value = - opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get(); - Assert.assertEquals(new Variant(1.0), value.getValue()); - Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + try { + opcUaClient = + getOpcUaClient( + "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); + } catch (final PipeException e) { + if (e.getCause() instanceof ConnectException) { + continue; + } else { + throw e; + } + } + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + opcUaClient.disconnect().get(); + break; + } - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - client - .alterPipe( - new TAlterPipeReq() - .setPipeName("testPipe") - .setIsReplaceAllConnectorAttributes(false) - .setConnectorAttributes(Collections.singletonMap("with-quality", "true")) - .setProcessorAttributes(Collections.emptyMap()) - .setExtractorAttributes(Collections.emptyMap())) - .getCode()); + while (true) { + final int[] ports = EnvUtils.searchAvailablePorts(); + tcpPort = ports[0]; + final int httpsPort = ports[1]; + sinkAttributes.put("tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("https.port", Integer.toString(httpsPort)); + sinkAttributes.put("with-quality", "true"); - TestUtils.executeNonQuery( + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .alterPipe( + new TAlterPipeReq() + .setPipeName("testPipe") + .setIsReplaceAllConnectorAttributes(true) + .setConnectorAttributes(sinkAttributes) + .setProcessorAttributes(Collections.emptyMap()) + .setExtractorAttributes(Collections.emptyMap())) + .getCode()); + try { + opcUaClient = + getOpcUaClient( + "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); + } catch (final PipeException e) { + if (e.getCause() instanceof ConnectException) { + continue; + } else { + throw e; + } + } + break; + } + + // Create aligned timeSeries to avoid tsFile parsing + TestUtils.executeNonQueries( env, - "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)", + Arrays.asList( + "create aligned timeSeries root.db.opc(value double, quality boolean, other int32)", + "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)"), null); long startTime = System.currentTimeMillis(); @@ -168,11 +227,12 @@ public void testOPCUAServerSink() throws Exception { .getCode()); // Banned none, only allows basic256sha256 + final int finalTcpPort = tcpPort; Assert.assertThrows( PipeException.class, () -> getOpcUaClient( - "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", + "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb", SecurityPolicy.None, "root", "root")); @@ -188,6 +248,13 @@ public void testOPCUAServerSink() throws Exception { "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port 12686 and https port 8443's password **** conflicts to the new password ****, reject reusing.", e.getMessage()); } + } finally { + if (tcpPort >= 0) { + final String lockPath = EnvUtils.getLockFilePath(tcpPort); + if (!new File(lockPath).delete()) { + System.out.printf("Delete lock file %s failed%n", lockPath); + } + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java index 725ecbbae98a..402091598f13 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java @@ -102,11 +102,11 @@ public void run() { configurableUaClient.run(client); } catch (final Exception e) { throw new PipeException( - "Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + "Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e); } } catch (final Exception e) { throw new PipeException( - "Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + "Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e); } } }