From b6bd767a13e31998220aab7727cf0f0c9f4a0dc6 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 13 Mar 2025 17:28:55 +0800 Subject: [PATCH 01/10] update doris docker images to 2.1.7 fix timezone format code modify workflow images modify fe.conf to avoid oom modify flight port modify workflow modify workflow modify workflow modify workflow modify workflow modify workflow update workflow --- .github/workflows/build-connector.yml | 93 +++++----- .github/workflows/checkstyle.yaml | 18 +- .github/workflows/license-eyes.yml | 7 +- .github/workflows/run-e2ecase.yml | 31 ++-- .github/workflows/run-itcase.yml | 34 ++-- .../apache/doris/flink/sink/BackendUtil.java | 2 +- .../flink/container/config/DorisPorts.java | 40 +++++ .../container/e2e/Doris2DorisE2ECase.java | 17 +- .../container/instance/DorisContainer.java | 164 +++++++++++++++--- .../flink/sink/DorisSinkFailoverITCase.java | 24 ++- .../doris/flink/source/DorisSourceITCase.java | 3 +- .../src/test/resources/docker/doris/be.conf | 8 +- .../src/test/resources/docker/doris/fe.conf | 4 +- 13 files changed, 328 insertions(+), 117 deletions(-) create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/container/config/DorisPorts.java diff --git a/.github/workflows/build-connector.yml b/.github/workflows/build-connector.yml index 2b2a242be..dc52a7906 100644 --- a/.github/workflows/build-connector.yml +++ b/.github/workflows/build-connector.yml @@ -23,59 +23,56 @@ on: jobs: build-extension: - name: "Build Connector" + name: "Build Connector (Flink ${{ matrix.flink-version }})" runs-on: ubuntu-latest + strategy: + matrix: + flink-version: ['1.15', '1.16', '1.17', '1.18', '1.19', '1.20'] + include: + - flink-version: '1.15' + flink-full-version: '1.15.0' + flink-python-id: 'flink-python_2.12' + - flink-version: '1.16' + flink-full-version: '1.16.0' + flink-python-id: 'flink-python' + - flink-version: '1.17' + flink-full-version: '1.17.0' + flink-python-id: 'flink-python' + - flink-version: '1.18' + flink-full-version: '1.18.0' + flink-python-id: 'flink-python' + - flink-version: '1.19' + flink-full-version: '1.19.0' + flink-python-id: 'flink-python' + - flink-version: '1.20' + flink-full-version: '1.20.0' + flink-python-id: 'flink-python' defaults: run: shell: bash steps: - - name: Checkout - uses: actions/checkout@master + - name: Checkout + uses: actions/checkout@v4 - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: '8' + - name: Setup Java + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '8' + cache: 'maven' - - name: Build flink connector 1.15 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.15.0 \ - -Dflink.minor.version=1.15 \ - -Dflink.python.id=flink-python_2.12 + - name: Cache Maven Repository + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + ${{ runner.os }}-maven- - - name: Build flink connector 1.16 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.16.0 \ - -Dflink.minor.version=1.16 \ - -Dflink.python.id=flink-python - - - name: Build flink connector 1.17 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.17.0 \ - -Dflink.minor.version=1.17 \ - -Dflink.python.id=flink-python - - - name: Build flink connector 1.18 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.18.0 \ - -Dflink.minor.version=1.18 \ - -Dflink.python.id=flink-python - - - name: Build flink connector 1.19 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.19.0 \ - -Dflink.minor.version=1.19 \ - -Dflink.python.id=flink-python - - - name: Build flink connector 1.20 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.20.0 \ - -Dflink.minor.version=1.20 \ - -Dflink.python.id=flink-python + - name: Build Flink Connector + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=${{ matrix.flink-full-version }} \ + -Dflink.minor.version=${{ matrix.flink-version }} \ + -Dflink.python.id=${{ matrix.flink-python-id }} \ No newline at end of file diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index 325ed71a6..4b3e35a14 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -27,17 +27,25 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: persist-credentials: false submodules: recursive - name: Setup java - uses: actions/setup-java@v2 + uses: actions/setup-java@v4 with: - distribution: adopt + distribution: 'temurin' java-version: '8' + - name: Cache Maven Repository + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}-checkstyle + restore-keys: | + ${{ runner.os }}-maven- + - name: Run java checkstyle - run: - cd flink-doris-connector && mvn clean compile checkstyle:checkstyle \ No newline at end of file + run: | + cd flink-doris-connector && mvn clean compile checkstyle:checkstyle -Dcheckstyle.skip=false -Dspotless.check.skip=false \ No newline at end of file diff --git a/.github/workflows/license-eyes.yml b/.github/workflows/license-eyes.yml index c9b4cd253..6ea6d3bf6 100644 --- a/.github/workflows/license-eyes.yml +++ b/.github/workflows/license-eyes.yml @@ -28,8 +28,11 @@ jobs: runs-on: ubuntu-latest steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@v2 + uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Check License - uses: apache/skywalking-eyes@v0.2.0 + uses: apache/skywalking-eyes@v0.5.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml index e6318f8f9..ab0727368 100644 --- a/.github/workflows/run-e2ecase.yml +++ b/.github/workflows/run-e2ecase.yml @@ -20,25 +20,36 @@ name: Run E2ECases on: pull_request: push: + branches: + - master jobs: build-extension: name: "Run E2ECases" runs-on: ubuntu-latest + env: + TZ: Asia/Shanghai defaults: run: shell: bash steps: - - name: Checkout - uses: actions/checkout@master + - name: Checkout + uses: actions/checkout@v4 - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: '8' + - name: Setup java + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '8' - - name: Run E2ECases - run: | - cd flink-doris-connector && mvn test -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + - name: Cache Maven Repository + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}-e2e + restore-keys: | + ${{ runner.os }}-maven- + - name: Run E2ECases + run: | + cd flink-doris-connector && mvn test -Dtest="*E2ECase" -Dimage="yagagagaga/doris-standalone:2.1.7" \ No newline at end of file diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml index 0b6706560..73cdacf5b 100644 --- a/.github/workflows/run-itcase.yml +++ b/.github/workflows/run-itcase.yml @@ -20,25 +20,39 @@ name: Run ITCases on: pull_request: push: + - master jobs: build-extension: name: "Run ITCases" runs-on: ubuntu-latest + strategy: + matrix: + java-version: [8] + env: + TZ: Asia/Shanghai defaults: run: shell: bash + steps: - - name: Checkout - uses: actions/checkout@master + - name: Checkout + uses: actions/checkout@v4 - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: '8' + - name: Setup java + uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.java-version }} + distribution: 'temurin' - - name: Run ITCases - run: | - cd flink-doris-connector && mvn test -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + - name: Cache Maven Repository + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- + - name: Run ITCases + run: | + cd flink-doris-connector && mvn test -Dtest="*ITCase" -Dimage="yagagagaga/doris-standalone:2.1.7" \ No newline at end of file diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 26771c9d9..38fa7024e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -119,7 +119,7 @@ public static boolean tryHttpConnection(String host) { responseMessage); return false; } catch (Exception ex) { - LOG.warn("Failed to connect to host:{}", host, ex); + LOG.warn("Failed to connect to host: {} ", host, ex); return false; } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/config/DorisPorts.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/config/DorisPorts.java new file mode 100644 index 000000000..85408c05e --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/config/DorisPorts.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.container.config; + +/** + * DorisPorts class contains default port configurations for Apache Doris Can be used in E2E testing + * and other scenarios requiring access to Doris services + */ +public class DorisPorts { + /** FE (Frontend) related ports */ + public static final class FE { + + public static final int HTTP_PORT = 8030; + public static final int QUERY_PORT = 9030; + public static final int FLIGHT_SQL_PORT = 10030; + } + + /** BE (Backend) related ports */ + public static final class BE { + + public static final int THRIFT_PORT = 9060; + public static final int WEBSERVICE_PORT = 8040; + public static final int FLIGHT_SQL_PORT = 10040; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index 1b56be220..aeecf2992 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -23,6 +23,7 @@ import org.apache.doris.flink.container.AbstractContainerTestBase; import org.apache.doris.flink.container.ContainerUtils; +import org.apache.doris.flink.container.config.DorisPorts.FE; import org.apache.doris.flink.table.DorisConfigOptions; import org.junit.Test; import org.junit.runner.RunWith; @@ -30,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.ZoneId; import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -60,7 +62,9 @@ public void testDoris2Doris() throws Exception { env.setParallelism(2); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - + tEnv.getConfig() + .getConfiguration() + .setString("table.local-time-zone", ZoneId.systemDefault().getId()); String sourceDDL = String.format( "CREATE TABLE doris_source (" @@ -96,13 +100,14 @@ public void testDoris2Doris() throws Exception { + " 'username' = '%s'," + " 'password' = '%s'," + " 'source.use-flight-sql' = '%s',\n" - + " 'source.flight-sql-port' = '9611'" + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE_SOURCE + "." + TABLE, getDorisUsername(), getDorisPassword(), - useFlightRead); + useFlightRead, + FE.FLIGHT_SQL_PORT); tEnv.executeSql(sourceDDL); String sinkDDL = @@ -150,9 +155,9 @@ public void testDoris2Doris() throws Exception { List excepted = Arrays.asList( - "1,true,127,32767,2147483647,9223372036854775807,170141183460469231731687303715884105727,3.14,2.71828,12345.6789,2025-03-11,2025-03-11T12:34:56,A,Hello, Doris!,This is a string,[\"Alice\", \"Bob\"],{\"key1\":\"value1\", \"key2\":\"value2\"},{\"name\": \"Tom\", \"age\": 30},{\"key\":\"value\"},{\"data\":123,\"type\":\"variant\"}", - "2,false,-128,-32768,-2147483648,-9223372036854775808,-170141183460469231731687303715884105728,-1.23,1.0E-4,-9999.9999,2024-12-25,2024-12-25T23:59:59,B,Doris Test,Another string!,[\"Charlie\", \"David\"],{\"k1\":\"v1\", \"k2\":\"v2\"},{\"name\": \"Jerry\", \"age\": 25},{\"status\":\"ok\"},{\"data\":[1,2,3]}", - "3,true,0,0,0,0,0,0.0,0.0,0.0000,2023-06-15,2023-06-15T08:00,C,Test Doris,Sample text,[\"Eve\", \"Frank\"],{\"alpha\":\"beta\"},{\"name\": \"Alice\", \"age\": 40},{\"nested\":{\"key\":\"value\"}},{\"variant\":\"test\"}", + "1,true,127,32767,2147483647,9223372036854775807,170141183460469231731687303715884105727,3.14,2.71828,12345.6789,2025-03-11,2025-03-11T12:34:56,A,Hello, Doris!,This is a string,[\"Alice\", \"Bob\"],{\"key1\":\"value1\", \"key2\":\"value2\"},{\"name\":\"Tom\", \"age\":30},{\"key\":\"value\"},{\"data\":123,\"type\":\"variant\"}", + "2,false,-128,-32768,-2147483648,-9223372036854775808,-170141183460469231731687303715884105728,-1.23,1.0E-4,-9999.9999,2024-12-25,2024-12-25T23:59:59,B,Doris Test,Another string!,[\"Charlie\", \"David\"],{\"k1\":\"v1\", \"k2\":\"v2\"},{\"name\":\"Jerry\", \"age\":25},{\"status\":\"ok\"},{\"data\":[1,2,3]}", + "3,true,0,0,0,0,0,0.0,0.0,0.0000,2023-06-15,2023-06-15T08:00,C,Test Doris,Sample text,[\"Eve\", \"Frank\"],{\"alpha\":\"beta\"},{\"name\":\"Alice\", \"age\":40},{\"nested\":{\"key\":\"value\"}},{\"variant\":\"test\"}", "4,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null"); String query = String.format("SELECT * FROM %s.%s", DATABASE_SINK, TABLE); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index 1bd483c3c..4b62d3414 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -17,13 +17,20 @@ package org.apache.doris.flink.container.instance; +import com.github.dockerjava.api.command.RestartContainerCmd; import com.google.common.collect.Lists; +import org.apache.doris.flink.container.config.DorisPorts.BE; +import org.apache.doris.flink.container.config.DorisPorts.FE; import org.apache.doris.flink.exception.DorisRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException; import org.testcontainers.utility.DockerLoggerFactory; import org.testcontainers.utility.MountableFile; @@ -40,15 +47,17 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; public class DorisContainer implements ContainerService { private static final Logger LOG = LoggerFactory.getLogger(DorisContainer.class); - private static final String DEFAULT_DOCKER_IMAGE = "apache/doris:doris-all-in-one-2.1.0"; + private static final String DEFAULT_DOCKER_IMAGE = "yagagagaga/doris-standalone:2.1.7"; private static final String DORIS_DOCKER_IMAGE = System.getProperty("image") == null ? DEFAULT_DOCKER_IMAGE @@ -58,15 +67,16 @@ public class DorisContainer implements ContainerService { private static final String JDBC_URL = "jdbc:mysql://%s:9030"; private static final String USERNAME = "root"; private static final String PASSWORD = ""; - private final GenericContainer dorisContainer; + private final GenericContainer dorisContainer; + private final String systemTimeZone = ZoneId.systemDefault().getId(); public DorisContainer() { dorisContainer = createDorisContainer(); } - public GenericContainer createDorisContainer() { + public GenericContainer createDorisContainer() { LOG.info("Will create doris containers."); - GenericContainer container = + GenericContainer container = new GenericContainer<>(DORIS_DOCKER_IMAGE) .withNetwork(Network.newNetwork()) .withNetworkAliases("DorisContainer") @@ -74,6 +84,10 @@ public GenericContainer createDorisContainer() { .withLogConsumer( new Slf4jLogConsumer( DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))) + .withCommand( + "sh", + "-c", + "chmod -R 644 /root/be/conf/be.conf /root/fe/conf/fe.conf && chmod -R 755 /root/be/conf /root/fe/conf && chown -R root:root /root/be/conf /root/fe/conf") // use customer conf .withCopyFileToContainer( MountableFile.forClasspathResource("docker/doris/be.conf"), @@ -81,16 +95,29 @@ public GenericContainer createDorisContainer() { .withCopyFileToContainer( MountableFile.forClasspathResource("docker/doris/fe.conf"), "/opt/apache-doris/fe/conf/fe.conf") - .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610); + // These exposed ports are used to connect to Doris. They are the default + // ports for yagagagaga/doris-standalone:2.1.7. + // For more information, see: + // https://hub.docker.com/r/yagagagaga/doris-standalone + .withExposedPorts( + FE.HTTP_PORT, + FE.QUERY_PORT, + BE.THRIFT_PORT, + BE.WEBSERVICE_PORT, + FE.FLIGHT_SQL_PORT, + BE.FLIGHT_SQL_PORT) + .withStartupTimeout(Duration.ofMinutes(5)) + .withEnv("TZ", systemTimeZone) + .waitingFor(Wait.forListeningPort()); container.setPortBindings( Lists.newArrayList( - String.format("%s:%s", "8030", "8030"), - String.format("%s:%s", "9030", "9030"), - String.format("%s:%s", "9060", "9060"), - String.format("%s:%s", "8040", "8040"), - String.format("%s:%s", "9611", "9611"), - String.format("%s:%s", "9610", "9610"))); + String.format("%s:%s", FE.HTTP_PORT, FE.HTTP_PORT), + String.format("%s:%s", FE.QUERY_PORT, FE.QUERY_PORT), + String.format("%s:%s", BE.THRIFT_PORT, BE.THRIFT_PORT), + String.format("%s:%s", BE.WEBSERVICE_PORT, BE.WEBSERVICE_PORT), + String.format("%s:%s", FE.FLIGHT_SQL_PORT, FE.FLIGHT_SQL_PORT), + String.format("%s:%s", BE.FLIGHT_SQL_PORT, BE.FLIGHT_SQL_PORT))); return container; } @@ -99,6 +126,8 @@ public void startContainer() { LOG.info("Starting doris containers."); // singleton doris container dorisContainer.start(); + // Wait for container to reach running state during first startup + waitForContainerRunning(); initializeJdbcConnection(); initializeVariables(); printClusterStatus(); @@ -111,11 +140,21 @@ public void startContainer() { @Override public void restartContainer() { - LOG.info("Restart doris container."); - dorisContainer - .getDockerClient() - .restartContainerCmd(dorisContainer.getContainerId()) - .exec(); + LOG.info("Restarting Doris container..."); + + try (RestartContainerCmd restartCmd = + dorisContainer + .getDockerClient() + .restartContainerCmd(dorisContainer.getContainerId())) { + restartCmd.exec(); + LOG.info("Restart command executed, waiting for container services to be ready"); + waitForContainerRunning(); + } catch (Exception e) { + LOG.error("Failed to restart Doris container", e); + throw new RuntimeException("Container restart failed", e); + } + + LOG.info("Doris container successfully restarted and services are ready"); } @Override @@ -145,6 +184,81 @@ private void initializeVariables() throws Exception { LOG.info("Init variables successfully."); } + // wait for container running + private void waitForContainerRunning() { + LOG.info("Waiting for Doris services to be accessible..."); + + try { + Awaitility.await("FE HTTP Service") + .atMost(5, TimeUnit.MINUTES) + .pollInterval(1, TimeUnit.SECONDS) + .until( + () -> { + try { + ExecResult result = + dorisContainer.execInContainer( + "curl", + "-s", + "-o", + "/dev/null", + "-w", + "%{http_code}", + "-m", + "2", + "http://localhost:8030"); + boolean ready = result.getStdout().equals("200"); + LOG.info("FE HTTP service on port 8030 is ready: {}", ready); + if (ready) { + LOG.info("FE HTTP service on port 8030 is ready"); + } + return ready; + } catch (Exception e) { + LOG.debug( + "Exception while checking FE HTTP service: {}", + e.getMessage()); + return false; + } + }); + + Awaitility.await("BE HTTP Service") + .atMost(5, TimeUnit.MINUTES) + .pollInterval(1, TimeUnit.SECONDS) + .until( + () -> { + try { + ExecResult result = + dorisContainer.execInContainer( + "curl", + "-s", + "-o", + "/dev/null", + "-w", + "%{http_code}", + "-m", + "2", + "http://localhost:8040"); + boolean ready = "200".equals(result.getStdout().trim()); + if (ready) { + LOG.info("BE HTTP service on port 8040 is ready"); + } else { + LOG.debug( + "BE HTTP service on port 8040 not ready yet, HTTP status: {}", + result.getStdout().trim()); + } + return ready; + } catch (Exception e) { + LOG.debug( + "Exception while checking BE HTTP service: {}", + e.getMessage()); + return false; + } + }); + + } catch (ConditionTimeoutException e) { + LOG.warn("Timed out after 5 minutes waiting for Doris services to be ready"); + } + } + @Override public String getJdbcUrl() { return String.format(JDBC_URL, dorisContainer.getHost()); @@ -172,18 +286,20 @@ public String getPassword() { @Override public String getFenodes() { - return dorisContainer.getHost() + ":8030"; + return dorisContainer.getHost() + ":" + FE.HTTP_PORT; } @Override public String getBenodes() { - return dorisContainer.getHost() + ":8040"; + return dorisContainer.getHost() + ":" + BE.WEBSERVICE_PORT; } public void close() { - LOG.info("Doris container is about to be close."); - dorisContainer.close(); - LOG.info("Doris container closed successfully."); + if (dorisContainer != null) { + LOG.info("Doris container is about to be close."); + dorisContainer.close(); + LOG.info("Doris container closed successfully."); + } } private void initializeJDBCDriver() throws MalformedURLException { @@ -249,12 +365,12 @@ private void echo(String... cmd) { reader.close(); p.destroy(); } catch (Exception e) { - e.printStackTrace(); + LOG.info("Execute command in docker container failed. ", e); } } - private List convertList(ResultSet rs) throws SQLException { - List list = new ArrayList<>(); + private List> convertList(ResultSet rs) throws SQLException { + List> list = new ArrayList<>(); ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); while (rs.next()) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java index 728118032..70f7fb29c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java @@ -35,13 +35,16 @@ import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** DorisSink ITCase failover case */ @RunWith(Parameterized.class) @@ -160,12 +163,23 @@ && getFlinkJobStatus(jobClient).equals(JobStatus.FINISHED)) { faultInjectionClear(); } else if (FaultType.RESTART_FAILURE.equals(faultType)) { // docker image restart time is about 60s - randomSleepSec = randomSleepSec + 60; - dorisContainerService.restartContainer(); + int stabilizationTime = randomSleepMs + 60; LOG.info( - "Restarting doris cluster, sleep {}s before next restart", - randomSleepSec); - Thread.sleep(randomSleepSec * 1000); + "Restarting doris cluster, will wait {} s after restart completes for stabilization", + randomSleepMs); + dorisContainerService.restartContainer(); + LOG.info("Doris container restart completed, waiting for system stabilization"); + Awaitility.await("Post-restart stabilization") + .pollInterval(1, TimeUnit.SECONDS) + .pollDelay(Duration.ofSeconds(randomSleepMs)) + .atMost(Duration.ofSeconds(stabilizationTime + 10)) + .until( + () -> { + LOG.debug("System stabilization in progress..."); + return true; + }); + + LOG.info("System stabilization period completed after container restart"); } } else { // Avoid frequent queries diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 73f6e03ac..e00220800 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -37,6 +37,7 @@ import org.apache.doris.flink.cfg.DorisStreamOptions; import org.apache.doris.flink.container.AbstractITCaseService; import org.apache.doris.flink.container.ContainerUtils; +import org.apache.doris.flink.container.config.DorisPorts.FE; import org.apache.doris.flink.datastream.DorisSourceFunction; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; import org.apache.doris.flink.table.DorisConfigOptions; @@ -92,7 +93,7 @@ public static Object[] parameters() { new Object[] { false, -1, }, - new Object[] {true, 9611} + new Object[] {true, FE.FLIGHT_SQL_PORT} }; } diff --git a/flink-doris-connector/src/test/resources/docker/doris/be.conf b/flink-doris-connector/src/test/resources/docker/doris/be.conf index 94b76e094..102f68049 100644 --- a/flink-doris-connector/src/test/resources/docker/doris/be.conf +++ b/flink-doris-connector/src/test/resources/docker/doris/be.conf @@ -19,13 +19,13 @@ CUR_DATE=`date +%Y%m%d-%H%M%S` PPROF_TMPDIR="$DORIS_HOME/log/" -JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" +JAVA_OPTS="-Xmx2048m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" # For jdk 9+, this JAVA_OPTS will be used as default JVM options -JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" +JAVA_OPTS_FOR_JDK_9="-Xmx2048m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" # For jdk 17+, this JAVA_OPTS will be used as default JVM options -JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED" +JAVA_OPTS_FOR_JDK_17="-Xmx2048m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED" # since 1.2, the JAVA_HOME need to be set to run BE process. # JAVA_HOME=/path/to/jdk/ @@ -43,7 +43,7 @@ be_port = 9060 webserver_port = 8040 heartbeat_service_port = 9050 brpc_port = 8060 -arrow_flight_sql_port = 9610 +arrow_flight_sql_port = 10040 enable_debug_points = true # HTTPS configures diff --git a/flink-doris-connector/src/test/resources/docker/doris/fe.conf b/flink-doris-connector/src/test/resources/docker/doris/fe.conf index a45fb5359..c88b86620 100644 --- a/flink-doris-connector/src/test/resources/docker/doris/fe.conf +++ b/flink-doris-connector/src/test/resources/docker/doris/fe.conf @@ -25,6 +25,8 @@ CUR_DATE=`date +%Y%m%d-%H%M%S` # Log dir LOG_DIR = ${DORIS_HOME}/log +# For jdk 8 +JAVA_OPTS="-Dfile.encoding=UTF-8 -Djavax.security.auth.useSubjectCredsOnly=false -Xss4m -Xmx8192m -XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCDateStamps -XX:+PrintGCDetails -Xloggc:$LOG_DIR/log/fe.gc.log.$CUR_DATE -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=50M -Dlog4j2.formatMsgNoLookups=true" # For jdk 17, this JAVA_OPTS will be used as default JVM options JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR -Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED" @@ -47,7 +49,7 @@ http_port = 8030 rpc_port = 9020 query_port = 9030 edit_log_port = 9010 -arrow_flight_sql_port = 9611 +arrow_flight_sql_port = 10030 enable_debug_points = true arrow_flight_token_cache_size = 50 # Choose one if there are more than one ip except loopback address. From 42b70ef9f62828679d31ed0e4e65228f58dddb47 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Mon, 17 Mar 2025 12:51:53 +0800 Subject: [PATCH 02/10] update workflow --- .../container/instance/DorisContainer.java | 22 ++++++++++++----- .../flink/sink/DorisSinkFailoverITCase.java | 24 ++++--------------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index 4b62d3414..a22252650 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -148,6 +148,8 @@ public void restartContainer() { .restartContainerCmd(dorisContainer.getContainerId())) { restartCmd.exec(); LOG.info("Restart command executed, waiting for container services to be ready"); + // Add service detection logic to ensure Doris container services are fully initialized + // and ready. waitForContainerRunning(); } catch (Exception e) { LOG.error("Failed to restart Doris container", e); @@ -205,11 +207,16 @@ private void waitForContainerRunning() { "%{http_code}", "-m", "2", - "http://localhost:8030"); + "http://localhost:" + FE.HTTP_PORT); boolean ready = result.getStdout().equals("200"); - LOG.info("FE HTTP service on port 8030 is ready: {}", ready); + LOG.info( + "FE HTTP service on port {} is ready: {}", + FE.HTTP_PORT, + ready); if (ready) { - LOG.info("FE HTTP service on port 8030 is ready"); + LOG.info( + "FE HTTP service on port {} is ready", + FE.HTTP_PORT); } return ready; } catch (Exception e) { @@ -236,13 +243,16 @@ private void waitForContainerRunning() { "%{http_code}", "-m", "2", - "http://localhost:8040"); + "http://localhost:" + BE.WEBSERVICE_PORT); boolean ready = "200".equals(result.getStdout().trim()); if (ready) { - LOG.info("BE HTTP service on port 8040 is ready"); + LOG.info( + "BE HTTP service on port {} is ready", + BE.WEBSERVICE_PORT); } else { LOG.debug( - "BE HTTP service on port 8040 not ready yet, HTTP status: {}", + "BE HTTP service on port {} not ready yet, HTTP status: {}", + BE.WEBSERVICE_PORT, result.getStdout().trim()); } return ready; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java index 70f7fb29c..728118032 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java @@ -35,16 +35,13 @@ import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.shaded.org.awaitility.Awaitility; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; /** DorisSink ITCase failover case */ @RunWith(Parameterized.class) @@ -163,23 +160,12 @@ && getFlinkJobStatus(jobClient).equals(JobStatus.FINISHED)) { faultInjectionClear(); } else if (FaultType.RESTART_FAILURE.equals(faultType)) { // docker image restart time is about 60s - int stabilizationTime = randomSleepMs + 60; - LOG.info( - "Restarting doris cluster, will wait {} s after restart completes for stabilization", - randomSleepMs); + randomSleepSec = randomSleepSec + 60; dorisContainerService.restartContainer(); - LOG.info("Doris container restart completed, waiting for system stabilization"); - Awaitility.await("Post-restart stabilization") - .pollInterval(1, TimeUnit.SECONDS) - .pollDelay(Duration.ofSeconds(randomSleepMs)) - .atMost(Duration.ofSeconds(stabilizationTime + 10)) - .until( - () -> { - LOG.debug("System stabilization in progress..."); - return true; - }); - - LOG.info("System stabilization period completed after container restart"); + LOG.info( + "Restarting doris cluster, sleep {}s before next restart", + randomSleepSec); + Thread.sleep(randomSleepSec * 1000); } } else { // Avoid frequent queries From 6ab18bf45659e0eec1fd0a8b24bf5f2ec6eb1a5e Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Mon, 17 Mar 2025 12:59:09 +0800 Subject: [PATCH 03/10] update workflow --- .github/workflows/build-connector.yml | 3 ++- .github/workflows/run-e2ecase.yml | 3 +-- .github/workflows/run-itcase.yml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-connector.yml b/.github/workflows/build-connector.yml index dc52a7906..8e0fbdfae 100644 --- a/.github/workflows/build-connector.yml +++ b/.github/workflows/build-connector.yml @@ -23,8 +23,9 @@ on: jobs: build-extension: - name: "Build Connector (Flink ${{ matrix.flink-version }})" + name: "Build Connector" runs-on: ubuntu-latest + if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) }} strategy: matrix: flink-version: ['1.15', '1.16', '1.17', '1.18', '1.19', '1.20'] diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml index ab0727368..785a7f211 100644 --- a/.github/workflows/run-e2ecase.yml +++ b/.github/workflows/run-e2ecase.yml @@ -20,13 +20,12 @@ name: Run E2ECases on: pull_request: push: - branches: - - master jobs: build-extension: name: "Run E2ECases" runs-on: ubuntu-latest + if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) }} env: TZ: Asia/Shanghai defaults: diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml index 73cdacf5b..e489909f6 100644 --- a/.github/workflows/run-itcase.yml +++ b/.github/workflows/run-itcase.yml @@ -20,12 +20,12 @@ name: Run ITCases on: pull_request: push: - - master jobs: build-extension: name: "Run ITCases" runs-on: ubuntu-latest + if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) }} strategy: matrix: java-version: [8] From fc3b4e195082497f7dd73658401f57e8e6977248 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Wed, 19 Mar 2025 16:30:21 +0800 Subject: [PATCH 04/10] update container setting --- .github/workflows/build-connector.yml | 65 +++++++++++-------- .../container/instance/DorisContainer.java | 24 ++++--- 2 files changed, 53 insertions(+), 36 deletions(-) diff --git a/.github/workflows/build-connector.yml b/.github/workflows/build-connector.yml index 8e0fbdfae..3d3fff374 100644 --- a/.github/workflows/build-connector.yml +++ b/.github/workflows/build-connector.yml @@ -26,28 +26,6 @@ jobs: name: "Build Connector" runs-on: ubuntu-latest if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) }} - strategy: - matrix: - flink-version: ['1.15', '1.16', '1.17', '1.18', '1.19', '1.20'] - include: - - flink-version: '1.15' - flink-full-version: '1.15.0' - flink-python-id: 'flink-python_2.12' - - flink-version: '1.16' - flink-full-version: '1.16.0' - flink-python-id: 'flink-python' - - flink-version: '1.17' - flink-full-version: '1.17.0' - flink-python-id: 'flink-python' - - flink-version: '1.18' - flink-full-version: '1.18.0' - flink-python-id: 'flink-python' - - flink-version: '1.19' - flink-full-version: '1.19.0' - flink-python-id: 'flink-python' - - flink-version: '1.20' - flink-full-version: '1.20.0' - flink-python-id: 'flink-python' defaults: run: shell: bash @@ -71,9 +49,44 @@ jobs: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} ${{ runner.os }}-maven- - - name: Build Flink Connector + - name: Build flink connector 1.15 run: | cd flink-doris-connector && mvn clean package \ - -Dflink.version=${{ matrix.flink-full-version }} \ - -Dflink.minor.version=${{ matrix.flink-version }} \ - -Dflink.python.id=${{ matrix.flink-python-id }} \ No newline at end of file + -Dflink.version=1.15.0 \ + -Dflink.minor.version=1.15 \ + -Dflink.python.id=flink-python_2.12 + + - name: Build flink connector 1.16 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.16.0 \ + -Dflink.minor.version=1.16 \ + -Dflink.python.id=flink-python + + - name: Build flink connector 1.17 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.17.0 \ + -Dflink.minor.version=1.17 \ + -Dflink.python.id=flink-python + + - name: Build flink connector 1.18 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.18.0 \ + -Dflink.minor.version=1.18 \ + -Dflink.python.id=flink-python + + - name: Build flink connector 1.19 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.19.0 \ + -Dflink.minor.version=1.19 \ + -Dflink.python.id=flink-python + + - name: Build flink connector 1.20 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.20.0 \ + -Dflink.minor.version=1.20 \ + -Dflink.python.id=flink-python \ No newline at end of file diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index a22252650..c2e9f9bed 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -24,6 +24,7 @@ import org.apache.doris.flink.exception.DorisRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; @@ -32,7 +33,6 @@ import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException; import org.testcontainers.utility.DockerLoggerFactory; -import org.testcontainers.utility.MountableFile; import java.io.BufferedReader; import java.io.InputStream; @@ -88,13 +88,14 @@ public GenericContainer createDorisContainer() { "sh", "-c", "chmod -R 644 /root/be/conf/be.conf /root/fe/conf/fe.conf && chmod -R 755 /root/be/conf /root/fe/conf && chown -R root:root /root/be/conf /root/fe/conf") - // use customer conf - .withCopyFileToContainer( - MountableFile.forClasspathResource("docker/doris/be.conf"), - "/opt/apache-doris/be/conf/be.conf") - .withCopyFileToContainer( - MountableFile.forClasspathResource("docker/doris/fe.conf"), - "/opt/apache-doris/fe/conf/fe.conf") + .withClasspathResourceMapping( + "docker/doris/be.conf", + "/root/be/conf/be.conf", + BindMode.READ_WRITE) + .withClasspathResourceMapping( + "docker/doris/fe.conf", + "/root/fe/conf/fe.conf", + BindMode.READ_WRITE) // These exposed ports are used to connect to Doris. They are the default // ports for yagagagaga/doris-standalone:2.1.7. // For more information, see: @@ -107,8 +108,7 @@ public GenericContainer createDorisContainer() { FE.FLIGHT_SQL_PORT, BE.FLIGHT_SQL_PORT) .withStartupTimeout(Duration.ofMinutes(5)) - .withEnv("TZ", systemTimeZone) - .waitingFor(Wait.forListeningPort()); + .withEnv("TZ", systemTimeZone); container.setPortBindings( Lists.newArrayList( @@ -128,6 +128,10 @@ public void startContainer() { dorisContainer.start(); // Wait for container to reach running state during first startup waitForContainerRunning(); + ExecResult feExecResult = dorisContainer.execInContainer("cat", "/root/fe/conf/fe.conf"); + ExecResult beExecResult = dorisContainer.execInContainer("cat", "/root/be/conf/be.conf"); + LOG.info("FE config: {}", feExecResult.getStdout()); + LOG.info("BE config: {}", beExecResult.getStdout()); initializeJdbcConnection(); initializeVariables(); printClusterStatus(); From 03373999e95317c793b3fc042ab8a3a5fdfa8125 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Wed, 19 Mar 2025 16:32:33 +0800 Subject: [PATCH 05/10] avoid memory leak --- .../container/e2e/Doris2DorisE2ECase.java | 5 +- .../container/instance/DorisContainer.java | 195 +++++++++--------- 2 files changed, 100 insertions(+), 100 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index aeecf2992..ece9fc421 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -31,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.ZoneId; import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -62,9 +61,7 @@ public void testDoris2Doris() throws Exception { env.setParallelism(2); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - tEnv.getConfig() - .getConfiguration() - .setString("table.local-time-zone", ZoneId.systemDefault().getId()); + tEnv.getConfig().getConfiguration().setString("table.local-time-zone", "Asia/Tokyo"); String sourceDDL = String.format( "CREATE TABLE doris_source (" diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index c2e9f9bed..df7411af4 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -29,12 +29,12 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException; import org.testcontainers.utility.DockerLoggerFactory; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.MalformedURLException; @@ -53,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; public class DorisContainer implements ContainerService { @@ -69,6 +70,8 @@ public class DorisContainer implements ContainerService { private static final String PASSWORD = ""; private final GenericContainer dorisContainer; private final String systemTimeZone = ZoneId.systemDefault().getId(); + private static URLClassLoader jdbcClassLoader; + private static final AtomicBoolean driverInitialized = new AtomicBoolean(false); public DorisContainer() { dorisContainer = createDorisContainer(); @@ -123,13 +126,17 @@ public GenericContainer createDorisContainer() { public void startContainer() { try { + if (dorisContainer.isRunning()) { + return; + } LOG.info("Starting doris containers."); // singleton doris container dorisContainer.start(); - // Wait for container to reach running state during first startup - waitForContainerRunning(); - ExecResult feExecResult = dorisContainer.execInContainer("cat", "/root/fe/conf/fe.conf"); - ExecResult beExecResult = dorisContainer.execInContainer("cat", "/root/be/conf/be.conf"); + // print Doris configuration information. + ExecResult feExecResult = + dorisContainer.execInContainer("cat", "/root/fe/conf/fe.conf"); + ExecResult beExecResult = + dorisContainer.execInContainer("cat", "/root/be/conf/be.conf"); LOG.info("FE config: {}", feExecResult.getStdout()); LOG.info("BE config: {}", beExecResult.getStdout()); initializeJdbcConnection(); @@ -152,9 +159,7 @@ public void restartContainer() { .restartContainerCmd(dorisContainer.getContainerId())) { restartCmd.exec(); LOG.info("Restart command executed, waiting for container services to be ready"); - // Add service detection logic to ensure Doris container services are fully initialized - // and ready. - waitForContainerRunning(); + initializeJdbcConnection(); } catch (Exception e) { LOG.error("Failed to restart Doris container", e); throw new RuntimeException("Container restart failed", e); @@ -190,89 +195,6 @@ private void initializeVariables() throws Exception { LOG.info("Init variables successfully."); } - // wait for container running - private void waitForContainerRunning() { - LOG.info("Waiting for Doris services to be accessible..."); - - try { - Awaitility.await("FE HTTP Service") - .atMost(5, TimeUnit.MINUTES) - .pollInterval(1, TimeUnit.SECONDS) - .until( - () -> { - try { - ExecResult result = - dorisContainer.execInContainer( - "curl", - "-s", - "-o", - "/dev/null", - "-w", - "%{http_code}", - "-m", - "2", - "http://localhost:" + FE.HTTP_PORT); - boolean ready = result.getStdout().equals("200"); - LOG.info( - "FE HTTP service on port {} is ready: {}", - FE.HTTP_PORT, - ready); - if (ready) { - LOG.info( - "FE HTTP service on port {} is ready", - FE.HTTP_PORT); - } - return ready; - } catch (Exception e) { - LOG.debug( - "Exception while checking FE HTTP service: {}", - e.getMessage()); - return false; - } - }); - - Awaitility.await("BE HTTP Service") - .atMost(5, TimeUnit.MINUTES) - .pollInterval(1, TimeUnit.SECONDS) - .until( - () -> { - try { - ExecResult result = - dorisContainer.execInContainer( - "curl", - "-s", - "-o", - "/dev/null", - "-w", - "%{http_code}", - "-m", - "2", - "http://localhost:" + BE.WEBSERVICE_PORT); - boolean ready = "200".equals(result.getStdout().trim()); - if (ready) { - LOG.info( - "BE HTTP service on port {} is ready", - BE.WEBSERVICE_PORT); - } else { - LOG.debug( - "BE HTTP service on port {} not ready yet, HTTP status: {}", - BE.WEBSERVICE_PORT, - result.getStdout().trim()); - } - return ready; - } catch (Exception e) { - LOG.debug( - "Exception while checking BE HTTP service: {}", - e.getMessage()); - return false; - } - }); - - } catch (ConditionTimeoutException e) { - LOG.warn("Timed out after 5 minutes waiting for Doris services to be ready"); - } - } - @Override public String getJdbcUrl() { return String.format(JDBC_URL, dorisContainer.getHost()); @@ -314,18 +236,34 @@ public void close() { dorisContainer.close(); LOG.info("Doris container closed successfully."); } + + closeJdbcClassLoader(); } private void initializeJDBCDriver() throws MalformedURLException { - URLClassLoader urlClassLoader = - new URLClassLoader( - new URL[] {new URL(DRIVER_JAR)}, DorisContainer.class.getClassLoader()); - LOG.info("Try to connect to Doris."); - Thread.currentThread().setContextClassLoader(urlClassLoader); + // Checks if the driver has already been initialized to avoid memory leak and class loading + // issues. + if (driverInitialized.get()) { + LOG.debug("JDBC driver already initialized, skipping initialization"); + return; + } + + LOG.info("Initializing JDBC driver"); + if (jdbcClassLoader == null) { + jdbcClassLoader = + new URLClassLoader( + new URL[] {new URL(DRIVER_JAR)}, DorisContainer.class.getClassLoader()); + } + + Thread.currentThread().setContextClassLoader(jdbcClassLoader); + driverInitialized.set(true); } private void initializeJdbcConnection() throws Exception { initializeJDBCDriver(); + // before connecting to Doris, wait for Doris FE to start,which is to avoid connect Doris + // failed when Doris FE is not ready. + waitDorisFeRunning(); try (Connection connection = getQueryConnection(); Statement statement = connection.createStatement()) { ResultSet resultSet; @@ -337,6 +275,71 @@ private void initializeJdbcConnection() throws Exception { LOG.info("Connected to Doris successfully."); } + private synchronized void closeJdbcClassLoader() { + if (jdbcClassLoader != null) { + try { + jdbcClassLoader.close(); + jdbcClassLoader = null; + driverInitialized.set(false); + LOG.info("JDBC class loader closed successfully"); + } catch (IOException e) { + LOG.warn("Failed to close JDBC class loader", e); + } + } + } + + /** + * Wait for Doris container to running. If the Doris FE not startup completely, try to connect + * it again until the doris FE is ready. + */ + private void waitDorisFeRunning() { + LOG.info("Waiting for Doris services to be accessible..."); + + // Poll Doris FE HTTP service every second with a maximum wait time of 5 minutes + // If the service is not available within this time, a timeout exception will be thrown + try { + Awaitility.await("FE HTTP Service") + .atMost(5, TimeUnit.MINUTES) + .pollInterval(1, TimeUnit.SECONDS) + .until( + () -> { + try { + ExecResult result = + dorisContainer.execInContainer( + "curl", + "-s", + "-o", + "/dev/null", + "-w", + "%{http_code}", + "-m", + "2", + "http://localhost:" + FE.HTTP_PORT); + boolean ready = result.getStdout().equals("200"); + LOG.info( + "FE HTTP service on port {} is ready: {}", + FE.HTTP_PORT, + ready); + + if (ready) { + LOG.info( + "FE HTTP service on port {} is ready", + FE.HTTP_PORT); + } + return ready; + } catch (Exception e) { + LOG.debug( + "Exception while checking FE HTTP service: {}", + e.getMessage()); + return false; + } + }); + + } catch (ConditionTimeoutException e) { + LOG.warn("Timed out after 5 minutes waiting for Doris services to be ready"); + } + } + private boolean isBeReady(ResultSet rs, Duration duration) throws SQLException { LockSupport.parkNanos(duration.toNanos()); if (rs.next()) { From b1043df66d7bfdc718cfb86b164261d93b3ee449 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Wed, 19 Mar 2025 21:27:52 +0800 Subject: [PATCH 06/10] delete useless setting --- .../org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index ece9fc421..59ad438c7 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -61,7 +61,6 @@ public void testDoris2Doris() throws Exception { env.setParallelism(2); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - tEnv.getConfig().getConfiguration().setString("table.local-time-zone", "Asia/Tokyo"); String sourceDDL = String.format( "CREATE TABLE doris_source (" From 98b305043d7da36099d6ff52814bd7a3ebca0bc5 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 20 Mar 2025 12:16:18 +0800 Subject: [PATCH 07/10] modify job status --- .../doris/flink/container/AbstractITCaseService.java | 8 ++++++-- .../doris/flink/sink/DorisSinkMultiTblFailoverITCase.java | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java index e3d709f51..28786d466 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java @@ -46,17 +46,21 @@ public abstract class AbstractITCaseService extends AbstractContainerTestBase { protected static void waitForJobStatus( JobClient client, List expectedStatus, Deadline deadline) throws Exception { + LOG.info("Waiting for job to reach one of the expected states: {}", expectedStatus); waitUntilCondition( () -> { JobStatus currentStatus; try { currentStatus = (JobStatus) client.getJobStatus().get(); } catch (IllegalStateException e) { - LOG.warn("Failed to get state, cause " + e.getMessage()); - currentStatus = JobStatus.FINISHED; + LOG.warn("Failed to get job status: {}", e.getMessage()); + return false; } + LOG.debug("Current job status: {}", currentStatus); + if (expectedStatus.contains(currentStatus)) { + LOG.info("Job reached expected status: {}", currentStatus); return true; } else if (currentStatus.isTerminalState()) { try { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java index f41469902..3d1d3f367 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java @@ -129,7 +129,7 @@ public void cancel() {} mockSource.sinkTo(builder.build()); JobClient jobClient = env.executeAsync(); CompletableFuture jobStatus = jobClient.getJobStatus(); - LOG.info("Job status: {}", jobStatus); + LOG.info("Job status: {}", jobStatus.get()); waitForJobStatus( jobClient, @@ -144,7 +144,7 @@ public void cancel() {} JobStatus.FAILED, JobStatus.RESTARTING); - waitForJobStatus(jobClient, errorStatus, Deadline.fromNow(Duration.ofSeconds(30))); + waitForJobStatus(jobClient, errorStatus, Deadline.fromNow(Duration.ofSeconds(60))); LOG.info("start to create add table"); initializeTable(TABLE_MULTI_CSV_NO_EXIST_TBL); @@ -165,7 +165,7 @@ public void cancel() {} String queryRes = String.format( "select id,task_id from %s.%s ", DATABASE, TABLE_MULTI_CSV_NO_EXIST_TBL); - List expected = Arrays.asList("1,3"); + List expected = Collections.singletonList("1,3"); if (!batchMode) { ContainerUtils.checkResult( From e048729335022cd73972024b41d592a8ccd355e2 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 20 Mar 2025 13:09:49 +0800 Subject: [PATCH 08/10] modify job status --- .github/workflows/run-itcase.yml | 16 +++++++++++++++- .../flink/container/instance/DorisContainer.java | 2 +- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml index e489909f6..13638ef38 100644 --- a/.github/workflows/run-itcase.yml +++ b/.github/workflows/run-itcase.yml @@ -31,6 +31,7 @@ jobs: java-version: [8] env: TZ: Asia/Shanghai + DORIS_IMAGE: "yagagagaga/doris-standalone:2.1.7" defaults: run: shell: bash @@ -44,6 +45,7 @@ jobs: with: java-version: ${{ matrix.java-version }} distribution: 'temurin' + cache: 'maven' - name: Cache Maven Repository uses: actions/cache@v4 @@ -53,6 +55,18 @@ jobs: restore-keys: | ${{ runner.os }}-maven- + - name: Free Disk Space + run: | + df -h + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + sudo rm -rf /opt/hostedtoolcache/CodeQL + df -h + + - name: Pull Doris Docker Image + run: docker pull ${DORIS_IMAGE} + - name: Run ITCases run: | - cd flink-doris-connector && mvn test -Dtest="*ITCase" -Dimage="yagagagaga/doris-standalone:2.1.7" \ No newline at end of file + cd flink-doris-connector && mvn test -Dtest="*ITCase" -Dimage="yagagagaga/doris-standalone:2.1.7" -B -V -Dfail-fast=true -DfailIfNoTests=true \ No newline at end of file diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index df7411af4..19f420fe3 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -58,7 +58,7 @@ public class DorisContainer implements ContainerService { private static final Logger LOG = LoggerFactory.getLogger(DorisContainer.class); - private static final String DEFAULT_DOCKER_IMAGE = "yagagagaga/doris-standalone:2.1.7"; + private static final String DEFAULT_DOCKER_IMAGE = "yagagagaga/doris-standalone:3.0.4"; private static final String DORIS_DOCKER_IMAGE = System.getProperty("image") == null ? DEFAULT_DOCKER_IMAGE From e0ea50d96a5f9014b1ea925a390a69666a60a681 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 20 Mar 2025 13:39:15 +0800 Subject: [PATCH 09/10] modify job status --- .../doris/flink/container/AbstractITCaseService.java | 2 +- .../flink/sink/DorisSinkMultiTblFailoverITCase.java | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java index 28786d466..65d0afe6c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java @@ -54,7 +54,7 @@ protected static void waitForJobStatus( currentStatus = (JobStatus) client.getJobStatus().get(); } catch (IllegalStateException e) { LOG.warn("Failed to get job status: {}", e.getMessage()); - return false; + currentStatus = JobStatus.FINISHED; } LOG.debug("Current job status: {}", currentStatus); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java index 3d1d3f367..107f549e2 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java @@ -129,7 +129,7 @@ public void cancel() {} mockSource.sinkTo(builder.build()); JobClient jobClient = env.executeAsync(); CompletableFuture jobStatus = jobClient.getJobStatus(); - LOG.info("Job status: {}", jobStatus.get()); + LOG.info("batch Mode:{} Job status: {}", batchMode, jobStatus.get()); waitForJobStatus( jobClient, @@ -143,20 +143,20 @@ public void cancel() {} JobStatus.CANCELED, JobStatus.FAILED, JobStatus.RESTARTING); - + LOG.info("batch Mode:{} wait job error status", batchMode); waitForJobStatus(jobClient, errorStatus, Deadline.fromNow(Duration.ofSeconds(60))); - LOG.info("start to create add table"); + LOG.info("batch Mode:{} start to create add table", batchMode); initializeTable(TABLE_MULTI_CSV_NO_EXIST_TBL); - LOG.info("wait job restart success"); + LOG.info("batch Mode:{} wait job restart success", batchMode); // wait table restart success waitForJobStatus( jobClient, Collections.singletonList(RUNNING), Deadline.fromNow(Duration.ofSeconds(60))); - LOG.info("wait job running finished"); + LOG.info("batch Mode:{} wait job running finished", batchMode); waitForJobStatus( jobClient, Collections.singletonList(FINISHED), @@ -168,9 +168,11 @@ public void cancel() {} List expected = Collections.singletonList("1,3"); if (!batchMode) { + LOG.info("check stream mode result!"); ContainerUtils.checkResult( getDorisQueryConnection(), LOG, expected, queryRes, 2, false); } else { + LOG.info("check batch mode result!"); List actualResult = ContainerUtils.getResult(getDorisQueryConnection(), LOG, expected, queryRes, 2); LOG.info("actual size: {}, expected size: {}", actualResult.size(), expected.size()); From 0aae75ef8ed58fc67afb6c477209db831b5a05dd Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 20 Mar 2025 14:36:31 +0800 Subject: [PATCH 10/10] modify job status --- .github/workflows/run-e2ecase.yml | 14 ++++++ .../container/instance/DorisContainer.java | 46 +++---------------- 2 files changed, 21 insertions(+), 39 deletions(-) diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml index 785a7f211..17987d1d9 100644 --- a/.github/workflows/run-e2ecase.yml +++ b/.github/workflows/run-e2ecase.yml @@ -28,6 +28,7 @@ jobs: if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) }} env: TZ: Asia/Shanghai + DORIS_IMAGE: "yagagagaga/doris-standalone:2.1.7" defaults: run: shell: bash @@ -40,6 +41,7 @@ jobs: with: distribution: 'temurin' java-version: '8' + cache: 'maven' - name: Cache Maven Repository uses: actions/cache@v4 @@ -49,6 +51,18 @@ jobs: restore-keys: | ${{ runner.os }}-maven- + - name: Free Disk Space + run: | + df -h + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + sudo rm -rf /opt/hostedtoolcache/CodeQL + df -h + + - name: Pull Doris Docker Image + run: docker pull ${DORIS_IMAGE} + - name: Run E2ECases run: | cd flink-doris-connector && mvn test -Dtest="*E2ECase" -Dimage="yagagagaga/doris-standalone:2.1.7" \ No newline at end of file diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index 19f420fe3..d0f87bde2 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -34,7 +34,6 @@ import org.testcontainers.utility.DockerLoggerFactory; import java.io.BufferedReader; -import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.MalformedURLException; @@ -53,7 +52,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; public class DorisContainer implements ContainerService { @@ -70,8 +68,6 @@ public class DorisContainer implements ContainerService { private static final String PASSWORD = ""; private final GenericContainer dorisContainer; private final String systemTimeZone = ZoneId.systemDefault().getId(); - private static URLClassLoader jdbcClassLoader; - private static final AtomicBoolean driverInitialized = new AtomicBoolean(false); public DorisContainer() { dorisContainer = createDorisContainer(); @@ -126,9 +122,6 @@ public GenericContainer createDorisContainer() { public void startContainer() { try { - if (dorisContainer.isRunning()) { - return; - } LOG.info("Starting doris containers."); // singleton doris container dorisContainer.start(); @@ -139,6 +132,7 @@ public void startContainer() { dorisContainer.execInContainer("cat", "/root/be/conf/be.conf"); LOG.info("FE config: {}", feExecResult.getStdout()); LOG.info("BE config: {}", beExecResult.getStdout()); + waitDorisFeRunning(); initializeJdbcConnection(); initializeVariables(); printClusterStatus(); @@ -159,7 +153,7 @@ public void restartContainer() { .restartContainerCmd(dorisContainer.getContainerId())) { restartCmd.exec(); LOG.info("Restart command executed, waiting for container services to be ready"); - initializeJdbcConnection(); + waitDorisFeRunning(); } catch (Exception e) { LOG.error("Failed to restart Doris container", e); throw new RuntimeException("Container restart failed", e); @@ -236,27 +230,14 @@ public void close() { dorisContainer.close(); LOG.info("Doris container closed successfully."); } - - closeJdbcClassLoader(); } private void initializeJDBCDriver() throws MalformedURLException { - // Checks if the driver has already been initialized to avoid memory leak and class loading - // issues. - if (driverInitialized.get()) { - LOG.debug("JDBC driver already initialized, skipping initialization"); - return; - } - - LOG.info("Initializing JDBC driver"); - if (jdbcClassLoader == null) { - jdbcClassLoader = - new URLClassLoader( - new URL[] {new URL(DRIVER_JAR)}, DorisContainer.class.getClassLoader()); - } - - Thread.currentThread().setContextClassLoader(jdbcClassLoader); - driverInitialized.set(true); + URLClassLoader urlClassLoader = + new URLClassLoader( + new URL[] {new URL(DRIVER_JAR)}, DorisContainer.class.getClassLoader()); + LOG.info("Try to connect to Doris."); + Thread.currentThread().setContextClassLoader(urlClassLoader); } private void initializeJdbcConnection() throws Exception { @@ -275,19 +256,6 @@ private void initializeJdbcConnection() throws Exception { LOG.info("Connected to Doris successfully."); } - private synchronized void closeJdbcClassLoader() { - if (jdbcClassLoader != null) { - try { - jdbcClassLoader.close(); - jdbcClassLoader = null; - driverInitialized.set(false); - LOG.info("JDBC class loader closed successfully"); - } catch (IOException e) { - LOG.warn("Failed to close JDBC class loader", e); - } - } - } - /** * Wait for Doris container to running. If the Doris FE not startup completely, try to connect * it again until the doris FE is ready.