Skip to content

Commit 80d5aea

Browse files
authored
[To dev/1.3] PipePlugin: Optimized the errorCode && Fixed the case-sensitive semantic (#16851) (#16852)
* PipePlugin: Optimized the errorCode && Fixed the case-sensitive semantic (#16851) * fix * fix * fix * fix * added
1 parent f2f9e97 commit 80d5aea

File tree

3 files changed

+221
-73
lines changed

3 files changed

+221
-73
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.junit.experimental.categories.Category;
3535
import org.junit.runner.RunWith;
3636

37+
import java.io.File;
3738
import java.sql.Connection;
3839
import java.sql.SQLException;
3940
import java.sql.Statement;
@@ -729,4 +730,64 @@ public void testValidPipeWithoutWithSink() {
729730
fail(e.getMessage());
730731
}
731732
}
733+
734+
@Test
735+
public void testPipePluginValidation() {
736+
try (final Connection connection = senderEnv.getConnection();
737+
final Statement statement = connection.createStatement()) {
738+
try {
739+
statement.execute(
740+
"create pipePlugin TestProcessor as 'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'xxx'");
741+
fail();
742+
} catch (final SQLException e) {
743+
Assert.assertEquals(
744+
"701: Untrusted uri xxx, current trusted_uri_pattern is file:.*", e.getMessage());
745+
}
746+
try {
747+
statement.execute(
748+
"create pipePlugin TestProcessor as 'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'file:.*'");
749+
fail();
750+
} catch (final SQLException e) {
751+
Assert.assertEquals("701: URI is not hierarchical", e.getMessage());
752+
}
753+
try {
754+
statement.execute(
755+
String.format(
756+
"create pipePlugin TestProcessor as 'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI '%s'",
757+
new File(
758+
System.getProperty("user.dir")
759+
+ File.separator
760+
+ "target"
761+
+ File.separator
762+
+ "test-classes"
763+
+ File.separator)
764+
.toURI()
765+
+ "PipePlugin.jar"));
766+
fail();
767+
} catch (final SQLException e) {
768+
Assert.assertEquals(
769+
"1603: Failed to get executable for PipePlugin TestProcessor, please check the URI.",
770+
e.getMessage());
771+
}
772+
try {
773+
statement.execute("drop pipePlugin test_processor");
774+
fail();
775+
} catch (final SQLException e) {
776+
Assert.assertEquals(
777+
"1601: Failed to drop PipePlugin [TEST_PROCESSOR], this PipePlugin has not been created",
778+
e.getMessage());
779+
}
780+
try {
781+
statement.execute("drop pipePlugin `Do-Nothing-Sink`");
782+
fail();
783+
} catch (final SQLException e) {
784+
Assert.assertEquals(
785+
"1601: Failed to drop PipePlugin [DO-NOTHING-SINK], the PipePlugin is a built-in PipePlugin",
786+
e.getMessage());
787+
}
788+
} catch (final SQLException e) {
789+
e.printStackTrace();
790+
fail(e.getMessage());
791+
}
792+
}
732793
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 77 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -821,85 +821,89 @@ public SettableFuture<ConfigTaskResult> createPipePlugin(
821821
return future;
822822
}
823823

824-
try (final ConfigNodeClient client =
825-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
826-
final String libRoot;
827-
final ByteBuffer jarFile;
828-
final String jarMd5;
824+
final String libRoot;
825+
final ByteBuffer jarFile;
826+
final String jarMd5;
829827

830-
final String jarFileName = new File(uriString).getName();
831-
try {
832-
final URI uri = new URI(uriString);
833-
if (uri.getScheme() == null) {
834-
future.setException(
835-
new IoTDBException(
836-
"The scheme of URI is not set, please specify the scheme of URI.",
837-
TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
838-
return future;
839-
}
840-
if (!uri.getScheme().equals("file")) {
841-
// Download executable
842-
final ExecutableResource resource =
843-
PipePluginExecutableManager.getInstance()
844-
.request(Collections.singletonList(uriString));
845-
final String jarFilePathUnderTempDir =
846-
PipePluginExecutableManager.getInstance()
847-
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
848-
+ jarFileName;
849-
// libRoot should be the path of the specified jar
850-
libRoot = jarFilePathUnderTempDir;
851-
jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
852-
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
853-
} else {
854-
// libRoot should be the path of the specified jar
855-
libRoot = new File(new URI(uriString)).getAbsolutePath();
856-
// If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to
857-
// ConfigNode.
858-
jarFile = ExecutableManager.transferToBytebuffer(libRoot);
859-
// Set md5 of the jar file
860-
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
861-
}
862-
} catch (final IOException | URISyntaxException e) {
863-
LOGGER.warn(
864-
"Failed to get executable for PipePlugin({}) using URI: {}.",
865-
createPipePluginStatement.getPluginName(),
866-
createPipePluginStatement.getUriString(),
867-
e);
828+
final String jarFileName = new File(uriString).getName();
829+
try {
830+
final URI uri = new URI(uriString);
831+
if (uri.getScheme() == null) {
868832
future.setException(
869833
new IoTDBException(
870-
"Failed to get executable for PipePlugin"
871-
+ createPipePluginStatement.getPluginName()
872-
+ "', please check the URI.",
834+
"The scheme of URI is not set, please specify the scheme of URI.",
873835
TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
874836
return future;
875837
}
838+
if (!uri.getScheme().equals("file")) {
839+
// Download executable
840+
final ExecutableResource resource =
841+
PipePluginExecutableManager.getInstance().request(Collections.singletonList(uriString));
842+
final String jarFilePathUnderTempDir =
843+
PipePluginExecutableManager.getInstance()
844+
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
845+
+ jarFileName;
846+
// libRoot should be the path of the specified jar
847+
libRoot = jarFilePathUnderTempDir;
848+
jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
849+
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
850+
} else {
851+
// libRoot should be the path of the specified jar
852+
libRoot = new File(new URI(uriString)).getAbsolutePath();
853+
// If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to
854+
// ConfigNode.
855+
jarFile = ExecutableManager.transferToBytebuffer(libRoot);
856+
// Set md5 of the jar file
857+
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
858+
}
859+
} catch (final URISyntaxException | IllegalArgumentException e) {
860+
future.setException(
861+
new IoTDBException(e.getMessage(), TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
862+
return future;
863+
} catch (final IOException e) {
864+
LOGGER.warn(
865+
"Failed to get executable for PipePlugin({}) using URI: {}.",
866+
createPipePluginStatement.getPluginName(),
867+
createPipePluginStatement.getUriString(),
868+
e);
869+
future.setException(
870+
new IoTDBException(
871+
"Failed to get executable for PipePlugin "
872+
+ createPipePluginStatement.getPluginName()
873+
+ ", please check the URI.",
874+
TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
875+
return future;
876+
}
876877

877-
// try to create instance, this request will fail if creation is not successful
878-
try (final PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) {
879-
// ensure that jar file contains the class and the class is a pipe plugin
880-
final Class<?> clazz =
881-
Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
882-
final PipePlugin ignored = (PipePlugin) clazz.getDeclaredConstructor().newInstance();
883-
} catch (final ClassNotFoundException
884-
| NoSuchMethodException
885-
| InstantiationException
886-
| IllegalAccessException
887-
| InvocationTargetException
888-
| ClassCastException e) {
889-
LOGGER.warn(
890-
"Failed to create function when try to create PipePlugin({}) instance first.",
891-
createPipePluginStatement.getPluginName(),
892-
e);
893-
future.setException(
894-
new IoTDBException(
895-
"Failed to load class '"
896-
+ createPipePluginStatement.getClassName()
897-
+ "', because it's not found in jar file or is invalid: "
898-
+ createPipePluginStatement.getUriString(),
899-
TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
900-
return future;
901-
}
878+
// try to create instance, this request will fail if creation is not successful
879+
try (final PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) {
880+
// ensure that jar file contains the class and the class is a pipe plugin
881+
final Class<?> clazz =
882+
Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
883+
final PipePlugin ignored = (PipePlugin) clazz.getDeclaredConstructor().newInstance();
884+
} catch (final ClassNotFoundException
885+
| NoSuchMethodException
886+
| InstantiationException
887+
| IllegalAccessException
888+
| InvocationTargetException
889+
| ClassCastException
890+
| IOException e) {
891+
LOGGER.warn(
892+
"Failed to create pipePlugin when try to create PipePlugin({}) instance first.",
893+
createPipePluginStatement.getPluginName(),
894+
e);
895+
future.setException(
896+
new IoTDBException(
897+
"Failed to load class '"
898+
+ createPipePluginStatement.getClassName()
899+
+ "', because it's not found in jar file or is invalid: "
900+
+ createPipePluginStatement.getUriString(),
901+
TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
902+
return future;
903+
}
902904

905+
try (final ConfigNodeClient client =
906+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
903907
final TSStatus executionStatus =
904908
client.createPipePlugin(
905909
new TCreatePipePluginReq()
@@ -924,7 +928,7 @@ public SettableFuture<ConfigTaskResult> createPipePlugin(
924928
} else {
925929
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
926930
}
927-
} catch (final ClientManagerException | TException | IOException e) {
931+
} catch (final ClientManagerException | TException e) {
928932
future.setException(e);
929933
}
930934
return future;
@@ -939,7 +943,7 @@ public SettableFuture<ConfigTaskResult> dropPipePlugin(
939943
final TSStatus executionStatus =
940944
client.dropPipePlugin(
941945
new TDropPipePluginReq()
942-
.setPluginName(dropPipePluginStatement.getPluginName())
946+
.setPluginName(dropPipePluginStatement.getPluginName().toUpperCase())
943947
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition()));
944948
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
945949
LOGGER.warn(
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.execution;
21+
22+
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
23+
import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement;
24+
25+
import org.junit.Assert;
26+
import org.junit.Test;
27+
28+
import java.io.File;
29+
30+
import static org.junit.Assert.fail;
31+
32+
public class ClusterConfigTaskExecutorTest {
33+
34+
@Test
35+
public void testPipePlugin() {
36+
try {
37+
ClusterConfigTaskExecutor.getInstance()
38+
.createPipePlugin(
39+
new CreatePipePluginStatement("TestProcessor", false, "someClass", "uri"))
40+
.get();
41+
Assert.fail();
42+
} catch (final Exception e) {
43+
Assert.assertTrue(
44+
e.getMessage()
45+
.contains("The scheme of URI is not set, please specify the scheme of URI."));
46+
}
47+
48+
try {
49+
ClusterConfigTaskExecutor.getInstance()
50+
.createPipePlugin(
51+
new CreatePipePluginStatement("TestProcessor", false, "someClass", "file:.*"))
52+
.get();
53+
Assert.fail();
54+
} catch (final Exception e) {
55+
Assert.assertTrue(e.getMessage().contains("URI is not hierarchical"));
56+
}
57+
58+
try {
59+
ClusterConfigTaskExecutor.getInstance()
60+
.createPipePlugin(
61+
new CreatePipePluginStatement(
62+
"TestProcessor",
63+
false,
64+
"org.apache.iotdb.db.pipe.example.TestProcessor",
65+
new File(
66+
System.getProperty("user.dir")
67+
+ File.separator
68+
+ "target"
69+
+ File.separator
70+
+ "test-classes"
71+
+ File.separator)
72+
.toURI()
73+
+ "PipePlugin.jar"))
74+
.get();
75+
fail();
76+
} catch (final Exception e) {
77+
Assert.assertTrue(
78+
e.getMessage()
79+
.contains(
80+
"Failed to get executable for PipePlugin TestProcessor, please check the URI."));
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)