Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
341cb2d
pj
Caideyipi Dec 22, 2025
109a3fc
cj
Caideyipi Dec 22, 2025
efa0fe4
bone
Caideyipi Dec 22, 2025
8efcd63
fix
Caideyipi Dec 22, 2025
fd39559
fix
Caideyipi Dec 22, 2025
6be7b4d
framework
Caideyipi Dec 22, 2025
f78e6d7
fix
Caideyipi Dec 22, 2025
ef67611
trilog
Caideyipi Dec 22, 2025
b871925
framework
Caideyipi Dec 22, 2025
97897c3
fix
Caideyipi Dec 22, 2025
97db004
fix
Caideyipi Dec 22, 2025
2da0e40
yl
Caideyipi Dec 22, 2025
667a148
stack-client
Caideyipi Dec 22, 2025
205cd19
fix
Caideyipi Dec 22, 2025
4901a6f
might
Caideyipi Dec 23, 2025
f2735bb
sleep-removal
Caideyipi Dec 23, 2025
ed07970
cleaning
Caideyipi Dec 23, 2025
b1b7e2f
fix
Caideyipi Dec 23, 2025
d78b11d
sec-dir
Caideyipi Dec 23, 2025
5edd5fd
cleaning
Caideyipi Dec 23, 2025
2a3cdcf
remove-poison
Caideyipi Dec 23, 2025
8d9d27b
Merge branch 'master' of https://github.com/apache/iotdb into client-opc
Caideyipi Dec 23, 2025
cacf806
f
Caideyipi Dec 23, 2025
20095a1
fix
Caideyipi Dec 23, 2025
77c9cd3
clean-sit
Caideyipi Dec 23, 2025
74a917c
sit-comp
Caideyipi Dec 23, 2025
0abc6b2
object
Caideyipi Dec 23, 2025
1f07ae3
many-clean
Caideyipi Dec 23, 2025
4a1ad70
sit-sit
Caideyipi Dec 23, 2025
0c556ea
fix
Caideyipi Dec 23, 2025
c6d1170
fix
Caideyipi Dec 23, 2025
4d70088
fix
Caideyipi Dec 23, 2025
6f25d12
ref
Caideyipi Dec 23, 2025
727f007
sit
Caideyipi Dec 23, 2025
2645e83
partial
Caideyipi Dec 24, 2025
48d02de
security-policies
Caideyipi Dec 24, 2025
3b89d41
check-equals
Caideyipi Dec 24, 2025
f5baef5
check-err
Caideyipi Dec 24, 2025
32c0a61
fix
Caideyipi Dec 24, 2025
5203eb4
Merge branch 'master' of https://github.com/apache/iotdb into client-opc
Caideyipi Dec 25, 2025
4861ca3
compile-fix
Caideyipi Dec 25, 2025
efd189b
adjust
Caideyipi Dec 25, 2025
bf72c19
ut
Caideyipi Dec 25, 2025
e19adb5
refactor
Caideyipi Dec 25, 2025
f8888a1
fix_and_IT
Caideyipi Dec 25, 2025
2d0c750
fix
Caideyipi Dec 25, 2025
39b13b1
placeholder
Caideyipi Dec 25, 2025
ae331f4
rollback
Caideyipi Dec 25, 2025
012e666
eliminate-fault
Caideyipi Dec 25, 2025
9bf0c49
pw
Caideyipi Dec 26, 2025
72e2f68
fix
Caideyipi Dec 26, 2025
4cd4299
f
Caideyipi Dec 26, 2025
803825c
fix
Caideyipi Dec 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT {

@Before
public void setUp() {
MultiEnvFactory.createEnv(2);
MultiEnvFactory.createEnv(1);
env = MultiEnvFactory.getEnv(0);
env.getConfig()
.getCommonConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,68 +20,174 @@
package org.apache.iotdb.pipe.it.single;

import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.common.conf.TSFileConfig;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT1.class})
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
@Test
public void testOPCUASink() throws Exception {
public void testOPCUAServerSink() throws Exception {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {

TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null);

final Map<String, String> connectorAttributes = new HashMap<>();
connectorAttributes.put("sink", "opc-ua-sink");
connectorAttributes.put("opcua.model", "client-server");
final Map<String, String> sinkAttributes = new HashMap<>();

sinkAttributes.put("sink", "opc-ua-sink");
sinkAttributes.put("opcua.model", "client-server");
sinkAttributes.put("security-policy", "None");

final int[] ports = EnvUtils.searchAvailablePorts();
final int tcpPort = ports[0];
final int httpsPort = ports[1];
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("https.port", Integer.toString(httpsPort));

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(Collections.emptyMap())
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.singletonMap("user", "root"))
.setProcessorAttributes(Collections.emptyMap()))
.getCode());

final OpcUaClient opcUaClient =
getOpcUaClient(
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
DataValue value =
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.alterPipe(
new TAlterPipeReq()
.setPipeName("testPipe")
.setIsReplaceAllConnectorAttributes(false)
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
.setProcessorAttributes(Collections.emptyMap())
.setExtractorAttributes(Collections.emptyMap()))
.getCode());

TestUtils.executeNonQuery(
env,
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
null);

long startTime = System.currentTimeMillis();
while (true) {
try {
value =
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
break;
} catch (final Throwable t) {
if (System.currentTimeMillis() - startTime > 10_000L) {
throw t;
}
}
}

TestUtils.executeNonQuery(
env, "insert into root.db.opc(time, quality) values (2, true)", null);
TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null);

startTime = System.currentTimeMillis();
while (true) {
try {
value =
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime());
Assert.assertEquals(new Variant(2.0), value.getValue());
Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
break;
} catch (final Throwable t) {
if (System.currentTimeMillis() - startTime > 10_000L) {
throw t;
}
}
}

opcUaClient.disconnect().get();
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());

// Test reconstruction
connectorAttributes.put("password123456", "test");
sinkAttributes.put("password", "test");
sinkAttributes.put("security-policy", "basic256sha256");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());

// Banned none, only allows basic256sha256
Assert.assertThrows(
PipeException.class,
() ->
getOpcUaClient(
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
SecurityPolicy.None,
"root",
"root"));

// Test conflict
connectorAttributes.put("password123456", "conflict");
Assert.assertEquals(
TSStatusCode.PIPE_ERROR.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
sinkAttributes.put("password", "conflict");
try {
TestUtils.executeNonQuery(
env, "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict')", null);
Assert.fail();
} catch (final Exception e) {
Assert.assertEquals(
"org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port 12686 and https port 8443's password **** conflicts to the new password ****, reject reusing.",
e.getMessage());
}
}
}

Expand All @@ -93,42 +199,74 @@ public void testOPCUASinkInTableModel() throws Exception {
TableModelUtils.createDataBaseAndTable(env, "test", "test");
TableModelUtils.insertData("test", "test", 0, 10, env);

final Map<String, String> connectorAttributes = new HashMap<>();
connectorAttributes.put("sink", "opc-ua-sink");
connectorAttributes.put("opcua.model", "client-server");
final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();
sourceAttributes.put("capture.table", "true");
sourceAttributes.put("user", "root");

sinkAttributes.put("sink", "opc-ua-sink");
sinkAttributes.put("opcua.model", "client-server");

final int[] ports = EnvUtils.searchAvailablePorts();
final int tcpPort = ports[0];
final int httpsPort = ports[1];
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("https.port", Integer.toString(httpsPort));

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(Collections.singletonMap("capture.table", "true"))
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());

// Test reconstruction
connectorAttributes.put("password123456", "test");
sinkAttributes.put("password123456", "test");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());

// Test conflict
connectorAttributes.put("password123456", "conflict");
sinkAttributes.put("password123456", "conflict");
Assert.assertEquals(
TSStatusCode.PIPE_ERROR.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
}
}

private static OpcUaClient getOpcUaClient(
final String nodeUrl,
final SecurityPolicy policy,
final String userName,
final String password) {
final IoTDBOpcUaClient client;

final IdentityProvider provider =
Objects.nonNull(userName)
? new UsernameProvider(userName, password)
: new AnonymousProvider();

final String securityDir =
CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
+ File.separatorChar
+ UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));

client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
new ClientRunner(client, securityDir, password).run();
return client.getClient();
}
}
12 changes: 12 additions & 0 deletions iotdb-core/datanode/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,18 @@
<groupId>org.eclipse.milo</groupId>
<artifactId>stack-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>stack-client</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-client</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
Expand Down
Loading
Loading