Skip to content

Commit

Permalink
branch-3.0: [opt](jdbc catalog) Compatible with higher ClickHouse JDB…
Browse files Browse the repository at this point in the history
…C Driver versions #46026 (#48182)

Cherry-picked from #46026

Co-authored-by: zy-kkk <[email protected]>
  • Loading branch information
github-actions[bot] and zy-kkk authored Feb 22, 2025
1 parent d2464dd commit 6caa3cf
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private void testFeToJdbcConnection() throws DdlException {
jdbcClient.testConnection();
} catch (JdbcClientException e) {
String errorMessage = "Test FE Connection to JDBC Failed: " + e.getMessage();
LOG.error(errorMessage, e);
LOG.warn(errorMessage, e);
throw new DdlException(errorMessage, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,103 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema;

import com.google.common.collect.Lists;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

public class JdbcClickHouseClient extends JdbcClient {

private final Boolean databaseTermIsCatalog;

protected JdbcClickHouseClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
try (Connection conn = getConnection()) {
String jdbcUrl = conn.getMetaData().getURL();
if (!isNewClickHouseDriver(getJdbcDriverVersion())) {
this.databaseTermIsCatalog = false;
} else {
this.databaseTermIsCatalog = "catalog".equalsIgnoreCase(getDatabaseTermFromUrl(jdbcUrl));
}
} catch (SQLException e) {
throw new JdbcClientException("Failed to initialize JdbcClickHouseClient: %s", e.getMessage());
}
}

@Override
public List<String> getDatabaseNameList() {
Connection conn = null;
ResultSet rs = null;
List<String> remoteDatabaseNames = Lists.newArrayList();
try {
conn = getConnection();
if (isOnlySpecifiedDatabase && includeDatabaseMap.isEmpty() && excludeDatabaseMap.isEmpty()) {
if (databaseTermIsCatalog) {
remoteDatabaseNames.add(conn.getCatalog());
} else {
remoteDatabaseNames.add(conn.getSchema());
}
} else {
if (databaseTermIsCatalog) {
rs = conn.getMetaData().getCatalogs();
} else {
rs = conn.getMetaData().getSchemas(conn.getCatalog(), null);
}
while (rs.next()) {
remoteDatabaseNames.add(rs.getString(1));
}
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get database name list from jdbc", e);
} finally {
close(rs, conn);
}
return filterDatabaseNames(remoteDatabaseNames);
}

@Override
protected void processTable(String remoteDbName, String remoteTableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
Connection conn = null;
ResultSet rs = null;
try {
conn = super.getConnection();
DatabaseMetaData databaseMetaData = conn.getMetaData();
if (databaseTermIsCatalog) {
rs = databaseMetaData.getTables(remoteDbName, null, remoteTableName, tableTypes);
} else {
rs = databaseMetaData.getTables(null, remoteDbName, remoteTableName, tableTypes);
}
resultSetConsumer.accept(rs);
} catch (SQLException e) {
throw new JdbcClientException("Failed to process table", e);
} finally {
close(rs, conn);
}
}

@Override
protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String catalogName, String remoteDbName,
String remoteTableName) throws SQLException {
if (databaseTermIsCatalog) {
return databaseMetaData.getColumns(remoteDbName, null, remoteTableName, null);
} else {
return databaseMetaData.getColumns(catalogName, remoteDbName, remoteTableName, null);
}
}

@Override
protected String getCatalogName(Connection conn) throws SQLException {
if (databaseTermIsCatalog) {
return null;
} else {
return conn.getCatalog();
}
}

@Override
Expand Down Expand Up @@ -121,4 +212,43 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
return Type.UNSUPPORTED;
}
}

/**
* Determine whether the driver version is greater than or equal to 0.5.0.
*/
private static boolean isNewClickHouseDriver(String driverVersion) {
if (driverVersion == null) {
throw new JdbcClientException("Driver version cannot be null");
}
try {
String[] versionParts = driverVersion.split("\\.");
int majorVersion = Integer.parseInt(versionParts[0]);
int minorVersion = Integer.parseInt(versionParts[1]);
// Determine whether it is greater than or equal to 0.5.x
return (majorVersion > 0) || (majorVersion == 0 && minorVersion >= 5);
} catch (NumberFormatException | ArrayIndexOutOfBoundsException e) {
throw new JdbcClientException("Invalid clickhouse driver version format: " + driverVersion, e);
}
}

/**
* Extract databaseterm parameters from the jdbc url.
*/
private String getDatabaseTermFromUrl(String jdbcUrl) {
if (jdbcUrl != null && jdbcUrl.toLowerCase().contains("databaseterm=schema")) {
return "schema";
}
return "catalog";
}

/**
* Get the driver version.
*/
public String getJdbcDriverVersion() {
try (Connection conn = getConnection()) {
return conn.getMetaData().getDriverVersion();
} catch (SQLException e) {
throw new JdbcClientException("Failed to get jdbc driver version", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,4 +480,16 @@ public void testConnection() {
public String getTestQuery() {
return "select 1";
}

public String getJdbcDriverVersion() {
Connection conn = null;
try {
conn = getConnection();
return conn.getMetaData().getDriverVersion();
} catch (SQLException e) {
throw new JdbcClientException("Failed to get jdbc driver version", e);
} finally {
close(conn);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public JdbcClient createClient(JdbcClientConfig jdbcClientConfig) throws JdbcCli
throw new JdbcClientException("Failed to determine OceanBase compatibility mode");
}
} catch (SQLException e) {
throw new JdbcClientException("Failed to initialize JdbcOceanBaseClient", e.getMessage());
throw new JdbcClientException("Failed to initialize JdbcOceanBaseClient: %s", e.getMessage());
} finally {
close(rs, stmt, conn);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.jdbc.client;

import org.junit.Assert;
import org.junit.Test;

import java.lang.reflect.Method;

public class JdbcClickHouseClientTest {

@Test
public void testIsNewClickHouseDriver() {
try {
Method method = JdbcClickHouseClient.class.getDeclaredMethod("isNewClickHouseDriver", String.class);
method.setAccessible(true);

// Valid test cases
Assert.assertTrue((boolean) method.invoke(null, "0.5.0")); // Major version 0, Minor version 5
Assert.assertTrue((boolean) method.invoke(null, "1.0.0")); // Major version 1
Assert.assertTrue((boolean) method.invoke(null, "0.6.3 (revision: a6a8a22)")); // Major version 0, Minor version 6
Assert.assertFalse((boolean) method.invoke(null, "0.4.2 (revision: 1513b27)")); // Major version 0, Minor version 4

// Invalid version formats
try {
method.invoke(null, "invalid.version"); // Invalid version format
Assert.fail("Expected JdbcClientException for invalid version 'invalid.version'");
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof JdbcClientException);
Assert.assertTrue(e.getCause().getMessage().contains("Invalid clickhouse driver version format"));
}

try {
method.invoke(null, ""); // Empty version
Assert.fail("Expected JdbcClientException for empty version");
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof JdbcClientException);
Assert.assertTrue(e.getCause().getMessage().contains("Invalid clickhouse driver version format"));
}

try {
method.invoke(null, (Object) null); // Null version
Assert.fail("Expected JdbcClientException for null version");
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof JdbcClientException);
Assert.assertTrue(e.getCause().getMessage().contains("Driver version cannot be null"));
}
} catch (Exception e) {
Assert.fail("Exception occurred while testing isNewClickHouseDriver: " + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.jdbc;

import org.apache.doris.datasource.jdbc.client.JdbcClientException;
package org.apache.doris.datasource.jdbc.client;

import org.junit.Assert;
import org.junit.Test;
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ suite("test_clickhouse_jdbc_catalog", "p0,external,clickhouse,external_docker,ex
String s3_endpoint = getS3Endpoint()
String bucket = getS3BucketName()
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/clickhouse-jdbc-0.4.2-all.jar"
String driver_url_7 = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/clickhouse-jdbc-0.7.1-patch1-all.jar"

String inDorisTable = "test_clickhouse_jdbc_doris_in_tb";

Expand Down Expand Up @@ -91,33 +92,57 @@ suite("test_clickhouse_jdbc_catalog", "p0,external,clickhouse,external_docker,ex

order_qt_dt_with_tz """ select * from dt_with_tz order by id; """

sql """create catalog if not exists clickhouse_catalog_test_conn_correct properties(
sql """ drop catalog if exists ${catalog_name} """


sql """ drop catalog if exists clickhouse_7_default """
sql """ create catalog if not exists clickhouse_7_default properties(
"type"="jdbc",
"user"="default",
"password"="123456",
"jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test",
"driver_url" = "${driver_url}",
"driver_class" = "com.clickhouse.jdbc.ClickHouseDriver",
"test_connection" = "true"
);
"""
order_qt_test_conn_correct """ select * from clickhouse_catalog_test_conn_correct.doris_test.type; """

test {
sql """create catalog if not exists clickhouse_catalog_test_conn_mistake properties(
"type"="jdbc",
"user"="default",
"password"="1234567",
"jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test",
"driver_url" = "${driver_url}",
"driver_class" = "com.clickhouse.jdbc.ClickHouseDriver",
"test_connection" = "true"
);
"""
exception "Test FE Connection to JDBC Failed"
}
sql """ drop catalog if exists ${catalog_name} """
sql """ drop catalog if exists clickhouse_catalog_test_conn_correct """
sql """ drop catalog if exists clickhouse_catalog_test_conn_mistake """
"driver_url" = "${driver_url_7}",
"driver_class" = "com.clickhouse.jdbc.ClickHouseDriver"
);"""

order_qt_clickhouse_7_default """ select * from clickhouse_7_default.doris_test.type; """
order_qt_clickhouse_7_default_tvf """ select * from query('catalog' = 'clickhouse_7_default', 'query' = 'select * from doris_test.type;') order by 1; """
order_qt_clickhouse_7_default_tvf_arr """ select * from query('catalog' = 'clickhouse_7_default', 'query' = 'select * from doris_test.arr;') order by 1; """

sql """ drop catalog if exists clickhouse_7_default """

sql """ drop catalog if exists clickhouse_7_catalog """

sql """ create catalog if not exists clickhouse_7_catalog properties(
"type"="jdbc",
"user"="default",
"password"="123456",
"jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test?databaseTerm=catalog",
"driver_url" = "${driver_url_7}",
"driver_class" = "com.clickhouse.jdbc.ClickHouseDriver"
);"""

order_qt_clickhouse_7_catalog """ select * from clickhouse_7_catalog.doris_test.type; """
order_qt_clickhouse_7_catalog_tvf """ select * from query('catalog' = 'clickhouse_7_catalog', 'query' = 'select * from doris_test.type;') order by 1; """
order_qt_clickhouse_7_catalog_tvf_arr """ select * from query('catalog' = 'clickhouse_7_catalog', 'query' = 'select * from doris_test.arr;') order by 1; """

sql """ drop catalog if exists clickhouse_7_catalog """

sql """ drop catalog if exists clickhouse_7_schema """

sql """ create catalog if not exists clickhouse_7_schema properties(
"type"="jdbc",
"user"="default",
"password"="123456",
"jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test?databaseTerm=schema",
"driver_url" = "${driver_url_7}",
"driver_class" = "com.clickhouse.jdbc.ClickHouseDriver"
);"""

order_qt_clickhouse_7_schema """ select * from clickhouse_7_schema.doris_test.type; """
order_qt_clickhouse_7_schema_tvf """ select * from query('catalog' = 'clickhouse_7_schema', 'query' = 'select * from doris_test.type;') order by 1; """
order_qt_clickhouse_7_schema_tvf_arr """ select * from query('catalog' = 'clickhouse_7_schema', 'query' = 'select * from doris_test.arr;') order by 1; """

sql """ drop catalog if exists clickhouse_7_schema """
}
}

0 comments on commit 6caa3cf

Please sign in to comment.