diff --git a/.github/workflows/build-connector.yml b/.github/workflows/build-connector.yml index 2b2a242be..3d3fff374 100644 --- a/.github/workflows/build-connector.yml +++ b/.github/workflows/build-connector.yml @@ -25,57 +25,68 @@ jobs: build-extension: name: "Build Connector" runs-on: ubuntu-latest + if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) }} defaults: run: shell: bash steps: - - name: Checkout - uses: actions/checkout@master + - name: Checkout + uses: actions/checkout@v4 - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: '8' + - name: Setup Java + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '8' + cache: 'maven' - - name: Build flink connector 1.15 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.15.0 \ - -Dflink.minor.version=1.15 \ - -Dflink.python.id=flink-python_2.12 + - name: Cache Maven Repository + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + ${{ runner.os }}-maven- - - name: Build flink connector 1.16 - run: | - cd flink-doris-connector && mvn clean package \ + - name: Build flink connector 1.15 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.15.0 \ + -Dflink.minor.version=1.15 \ + -Dflink.python.id=flink-python_2.12 + + - name: Build flink connector 1.16 + run: | + cd flink-doris-connector && mvn clean package \ -Dflink.version=1.16.0 \ -Dflink.minor.version=1.16 \ -Dflink.python.id=flink-python - - name: Build flink connector 1.17 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.17.0 \ - -Dflink.minor.version=1.17 \ - -Dflink.python.id=flink-python + - name: Build flink connector 1.17 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.17.0 \ + -Dflink.minor.version=1.17 \ + -Dflink.python.id=flink-python - - name: Build flink connector 1.18 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.18.0 \ - -Dflink.minor.version=1.18 \ - -Dflink.python.id=flink-python + - name: Build flink connector 1.18 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.18.0 \ + -Dflink.minor.version=1.18 \ + -Dflink.python.id=flink-python - - name: Build flink connector 1.19 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.19.0 \ - -Dflink.minor.version=1.19 \ - -Dflink.python.id=flink-python + - name: Build flink connector 1.19 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.19.0 \ + -Dflink.minor.version=1.19 \ + -Dflink.python.id=flink-python - - name: Build flink connector 1.20 - run: | - cd flink-doris-connector && mvn clean package \ - -Dflink.version=1.20.0 \ - -Dflink.minor.version=1.20 \ - -Dflink.python.id=flink-python + - name: Build flink connector 1.20 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.20.0 \ + -Dflink.minor.version=1.20 \ + -Dflink.python.id=flink-python \ No newline at end of file diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index 325ed71a6..4b3e35a14 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -27,17 +27,25 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: persist-credentials: false submodules: recursive - name: Setup java - uses: actions/setup-java@v2 + uses: actions/setup-java@v4 with: - distribution: adopt + distribution: 'temurin' java-version: '8' + - name: Cache Maven Repository + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}-checkstyle + restore-keys: | + ${{ runner.os }}-maven- + - name: Run java checkstyle - run: - cd flink-doris-connector && mvn clean compile checkstyle:checkstyle \ No newline at end of file + run: | + cd flink-doris-connector && mvn clean compile checkstyle:checkstyle -Dcheckstyle.skip=false -Dspotless.check.skip=false \ No newline at end of file diff --git a/.github/workflows/license-eyes.yml b/.github/workflows/license-eyes.yml index c9b4cd253..6ea6d3bf6 100644 --- a/.github/workflows/license-eyes.yml +++ b/.github/workflows/license-eyes.yml @@ -28,8 +28,11 @@ jobs: runs-on: ubuntu-latest steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@v2 + uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Check License - uses: apache/skywalking-eyes@v0.2.0 + uses: apache/skywalking-eyes@v0.5.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml index e6318f8f9..17987d1d9 100644 --- a/.github/workflows/run-e2ecase.yml +++ b/.github/workflows/run-e2ecase.yml @@ -25,20 +25,44 @@ jobs: build-extension: name: "Run E2ECases" runs-on: ubuntu-latest + if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) }} + env: + TZ: Asia/Shanghai + DORIS_IMAGE: "yagagagaga/doris-standalone:2.1.7" defaults: run: shell: bash steps: - - name: Checkout - uses: actions/checkout@master + - name: Checkout + uses: actions/checkout@v4 - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: '8' + - name: Setup java + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '8' + cache: 'maven' - - name: Run E2ECases - run: | - cd flink-doris-connector && mvn test -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + - name: Cache Maven Repository + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}-e2e + restore-keys: | + ${{ runner.os }}-maven- + - name: Free Disk Space + run: | + df -h + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + sudo rm -rf /opt/hostedtoolcache/CodeQL + df -h + + - name: Pull Doris Docker Image + run: docker pull ${DORIS_IMAGE} + + - name: Run E2ECases + run: | + cd flink-doris-connector && mvn test -Dtest="*E2ECase" -Dimage="yagagagaga/doris-standalone:2.1.7" \ No newline at end of file diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml index 0b6706560..13638ef38 100644 --- a/.github/workflows/run-itcase.yml +++ b/.github/workflows/run-itcase.yml @@ -25,20 +25,48 @@ jobs: build-extension: name: "Run ITCases" runs-on: ubuntu-latest + if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) }} + strategy: + matrix: + java-version: [8] + env: + TZ: Asia/Shanghai + DORIS_IMAGE: "yagagagaga/doris-standalone:2.1.7" defaults: run: shell: bash + steps: - - name: Checkout - uses: actions/checkout@master + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup java + uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.java-version }} + distribution: 'temurin' + cache: 'maven' + + - name: Cache Maven Repository + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: '8' + - name: Free Disk Space + run: | + df -h + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + sudo rm -rf /opt/hostedtoolcache/CodeQL + df -h - - name: Run ITCases - run: | - cd flink-doris-connector && mvn test -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + - name: Pull Doris Docker Image + run: docker pull ${DORIS_IMAGE} + - name: Run ITCases + run: | + cd flink-doris-connector && mvn test -Dtest="*ITCase" -Dimage="yagagagaga/doris-standalone:2.1.7" -B -V -Dfail-fast=true -DfailIfNoTests=true \ No newline at end of file diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 26771c9d9..38fa7024e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -119,7 +119,7 @@ public static boolean tryHttpConnection(String host) { responseMessage); return false; } catch (Exception ex) { - LOG.warn("Failed to connect to host:{}", host, ex); + LOG.warn("Failed to connect to host: {} ", host, ex); return false; } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java index e3d709f51..65d0afe6c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java @@ -46,17 +46,21 @@ public abstract class AbstractITCaseService extends AbstractContainerTestBase { protected static void waitForJobStatus( JobClient client, List expectedStatus, Deadline deadline) throws Exception { + LOG.info("Waiting for job to reach one of the expected states: {}", expectedStatus); waitUntilCondition( () -> { JobStatus currentStatus; try { currentStatus = (JobStatus) client.getJobStatus().get(); } catch (IllegalStateException e) { - LOG.warn("Failed to get state, cause " + e.getMessage()); + LOG.warn("Failed to get job status: {}", e.getMessage()); currentStatus = JobStatus.FINISHED; } + LOG.debug("Current job status: {}", currentStatus); + if (expectedStatus.contains(currentStatus)) { + LOG.info("Job reached expected status: {}", currentStatus); return true; } else if (currentStatus.isTerminalState()) { try { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/config/DorisPorts.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/config/DorisPorts.java new file mode 100644 index 000000000..85408c05e --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/config/DorisPorts.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.container.config; + +/** + * DorisPorts class contains default port configurations for Apache Doris Can be used in E2E testing + * and other scenarios requiring access to Doris services + */ +public class DorisPorts { + /** FE (Frontend) related ports */ + public static final class FE { + + public static final int HTTP_PORT = 8030; + public static final int QUERY_PORT = 9030; + public static final int FLIGHT_SQL_PORT = 10030; + } + + /** BE (Backend) related ports */ + public static final class BE { + + public static final int THRIFT_PORT = 9060; + public static final int WEBSERVICE_PORT = 8040; + public static final int FLIGHT_SQL_PORT = 10040; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index 1b56be220..59ad438c7 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -23,6 +23,7 @@ import org.apache.doris.flink.container.AbstractContainerTestBase; import org.apache.doris.flink.container.ContainerUtils; +import org.apache.doris.flink.container.config.DorisPorts.FE; import org.apache.doris.flink.table.DorisConfigOptions; import org.junit.Test; import org.junit.runner.RunWith; @@ -60,7 +61,6 @@ public void testDoris2Doris() throws Exception { env.setParallelism(2); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - String sourceDDL = String.format( "CREATE TABLE doris_source (" @@ -96,13 +96,14 @@ public void testDoris2Doris() throws Exception { + " 'username' = '%s'," + " 'password' = '%s'," + " 'source.use-flight-sql' = '%s',\n" - + " 'source.flight-sql-port' = '9611'" + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE_SOURCE + "." + TABLE, getDorisUsername(), getDorisPassword(), - useFlightRead); + useFlightRead, + FE.FLIGHT_SQL_PORT); tEnv.executeSql(sourceDDL); String sinkDDL = @@ -150,9 +151,9 @@ public void testDoris2Doris() throws Exception { List excepted = Arrays.asList( - "1,true,127,32767,2147483647,9223372036854775807,170141183460469231731687303715884105727,3.14,2.71828,12345.6789,2025-03-11,2025-03-11T12:34:56,A,Hello, Doris!,This is a string,[\"Alice\", \"Bob\"],{\"key1\":\"value1\", \"key2\":\"value2\"},{\"name\": \"Tom\", \"age\": 30},{\"key\":\"value\"},{\"data\":123,\"type\":\"variant\"}", - "2,false,-128,-32768,-2147483648,-9223372036854775808,-170141183460469231731687303715884105728,-1.23,1.0E-4,-9999.9999,2024-12-25,2024-12-25T23:59:59,B,Doris Test,Another string!,[\"Charlie\", \"David\"],{\"k1\":\"v1\", \"k2\":\"v2\"},{\"name\": \"Jerry\", \"age\": 25},{\"status\":\"ok\"},{\"data\":[1,2,3]}", - "3,true,0,0,0,0,0,0.0,0.0,0.0000,2023-06-15,2023-06-15T08:00,C,Test Doris,Sample text,[\"Eve\", \"Frank\"],{\"alpha\":\"beta\"},{\"name\": \"Alice\", \"age\": 40},{\"nested\":{\"key\":\"value\"}},{\"variant\":\"test\"}", + "1,true,127,32767,2147483647,9223372036854775807,170141183460469231731687303715884105727,3.14,2.71828,12345.6789,2025-03-11,2025-03-11T12:34:56,A,Hello, Doris!,This is a string,[\"Alice\", \"Bob\"],{\"key1\":\"value1\", \"key2\":\"value2\"},{\"name\":\"Tom\", \"age\":30},{\"key\":\"value\"},{\"data\":123,\"type\":\"variant\"}", + "2,false,-128,-32768,-2147483648,-9223372036854775808,-170141183460469231731687303715884105728,-1.23,1.0E-4,-9999.9999,2024-12-25,2024-12-25T23:59:59,B,Doris Test,Another string!,[\"Charlie\", \"David\"],{\"k1\":\"v1\", \"k2\":\"v2\"},{\"name\":\"Jerry\", \"age\":25},{\"status\":\"ok\"},{\"data\":[1,2,3]}", + "3,true,0,0,0,0,0,0.0,0.0,0.0000,2023-06-15,2023-06-15T08:00,C,Test Doris,Sample text,[\"Eve\", \"Frank\"],{\"alpha\":\"beta\"},{\"name\":\"Alice\", \"age\":40},{\"nested\":{\"key\":\"value\"}},{\"variant\":\"test\"}", "4,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null"); String query = String.format("SELECT * FROM %s.%s", DATABASE_SINK, TABLE); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index 1bd483c3c..d0f87bde2 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -17,15 +17,21 @@ package org.apache.doris.flink.container.instance; +import com.github.dockerjava.api.command.RestartContainerCmd; import com.google.common.collect.Lists; +import org.apache.doris.flink.container.config.DorisPorts.BE; +import org.apache.doris.flink.container.config.DorisPorts.FE; import org.apache.doris.flink.exception.DorisRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException; import org.testcontainers.utility.DockerLoggerFactory; -import org.testcontainers.utility.MountableFile; import java.io.BufferedReader; import java.io.InputStream; @@ -40,15 +46,17 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; public class DorisContainer implements ContainerService { private static final Logger LOG = LoggerFactory.getLogger(DorisContainer.class); - private static final String DEFAULT_DOCKER_IMAGE = "apache/doris:doris-all-in-one-2.1.0"; + private static final String DEFAULT_DOCKER_IMAGE = "yagagagaga/doris-standalone:3.0.4"; private static final String DORIS_DOCKER_IMAGE = System.getProperty("image") == null ? DEFAULT_DOCKER_IMAGE @@ -58,15 +66,16 @@ public class DorisContainer implements ContainerService { private static final String JDBC_URL = "jdbc:mysql://%s:9030"; private static final String USERNAME = "root"; private static final String PASSWORD = ""; - private final GenericContainer dorisContainer; + private final GenericContainer dorisContainer; + private final String systemTimeZone = ZoneId.systemDefault().getId(); public DorisContainer() { dorisContainer = createDorisContainer(); } - public GenericContainer createDorisContainer() { + public GenericContainer createDorisContainer() { LOG.info("Will create doris containers."); - GenericContainer container = + GenericContainer container = new GenericContainer<>(DORIS_DOCKER_IMAGE) .withNetwork(Network.newNetwork()) .withNetworkAliases("DorisContainer") @@ -74,23 +83,40 @@ public GenericContainer createDorisContainer() { .withLogConsumer( new Slf4jLogConsumer( DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))) - // use customer conf - .withCopyFileToContainer( - MountableFile.forClasspathResource("docker/doris/be.conf"), - "/opt/apache-doris/be/conf/be.conf") - .withCopyFileToContainer( - MountableFile.forClasspathResource("docker/doris/fe.conf"), - "/opt/apache-doris/fe/conf/fe.conf") - .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610); + .withCommand( + "sh", + "-c", + "chmod -R 644 /root/be/conf/be.conf /root/fe/conf/fe.conf && chmod -R 755 /root/be/conf /root/fe/conf && chown -R root:root /root/be/conf /root/fe/conf") + .withClasspathResourceMapping( + "docker/doris/be.conf", + "/root/be/conf/be.conf", + BindMode.READ_WRITE) + .withClasspathResourceMapping( + "docker/doris/fe.conf", + "/root/fe/conf/fe.conf", + BindMode.READ_WRITE) + // These exposed ports are used to connect to Doris. They are the default + // ports for yagagagaga/doris-standalone:2.1.7. + // For more information, see: + // https://hub.docker.com/r/yagagagaga/doris-standalone + .withExposedPorts( + FE.HTTP_PORT, + FE.QUERY_PORT, + BE.THRIFT_PORT, + BE.WEBSERVICE_PORT, + FE.FLIGHT_SQL_PORT, + BE.FLIGHT_SQL_PORT) + .withStartupTimeout(Duration.ofMinutes(5)) + .withEnv("TZ", systemTimeZone); container.setPortBindings( Lists.newArrayList( - String.format("%s:%s", "8030", "8030"), - String.format("%s:%s", "9030", "9030"), - String.format("%s:%s", "9060", "9060"), - String.format("%s:%s", "8040", "8040"), - String.format("%s:%s", "9611", "9611"), - String.format("%s:%s", "9610", "9610"))); + String.format("%s:%s", FE.HTTP_PORT, FE.HTTP_PORT), + String.format("%s:%s", FE.QUERY_PORT, FE.QUERY_PORT), + String.format("%s:%s", BE.THRIFT_PORT, BE.THRIFT_PORT), + String.format("%s:%s", BE.WEBSERVICE_PORT, BE.WEBSERVICE_PORT), + String.format("%s:%s", FE.FLIGHT_SQL_PORT, FE.FLIGHT_SQL_PORT), + String.format("%s:%s", BE.FLIGHT_SQL_PORT, BE.FLIGHT_SQL_PORT))); return container; } @@ -99,6 +125,14 @@ public void startContainer() { LOG.info("Starting doris containers."); // singleton doris container dorisContainer.start(); + // print Doris configuration information. + ExecResult feExecResult = + dorisContainer.execInContainer("cat", "/root/fe/conf/fe.conf"); + ExecResult beExecResult = + dorisContainer.execInContainer("cat", "/root/be/conf/be.conf"); + LOG.info("FE config: {}", feExecResult.getStdout()); + LOG.info("BE config: {}", beExecResult.getStdout()); + waitDorisFeRunning(); initializeJdbcConnection(); initializeVariables(); printClusterStatus(); @@ -111,11 +145,21 @@ public void startContainer() { @Override public void restartContainer() { - LOG.info("Restart doris container."); - dorisContainer - .getDockerClient() - .restartContainerCmd(dorisContainer.getContainerId()) - .exec(); + LOG.info("Restarting Doris container..."); + + try (RestartContainerCmd restartCmd = + dorisContainer + .getDockerClient() + .restartContainerCmd(dorisContainer.getContainerId())) { + restartCmd.exec(); + LOG.info("Restart command executed, waiting for container services to be ready"); + waitDorisFeRunning(); + } catch (Exception e) { + LOG.error("Failed to restart Doris container", e); + throw new RuntimeException("Container restart failed", e); + } + + LOG.info("Doris container successfully restarted and services are ready"); } @Override @@ -172,18 +216,20 @@ public String getPassword() { @Override public String getFenodes() { - return dorisContainer.getHost() + ":8030"; + return dorisContainer.getHost() + ":" + FE.HTTP_PORT; } @Override public String getBenodes() { - return dorisContainer.getHost() + ":8040"; + return dorisContainer.getHost() + ":" + BE.WEBSERVICE_PORT; } public void close() { - LOG.info("Doris container is about to be close."); - dorisContainer.close(); - LOG.info("Doris container closed successfully."); + if (dorisContainer != null) { + LOG.info("Doris container is about to be close."); + dorisContainer.close(); + LOG.info("Doris container closed successfully."); + } } private void initializeJDBCDriver() throws MalformedURLException { @@ -196,6 +242,9 @@ private void initializeJDBCDriver() throws MalformedURLException { private void initializeJdbcConnection() throws Exception { initializeJDBCDriver(); + // before connecting to Doris, wait for Doris FE to start,which is to avoid connect Doris + // failed when Doris FE is not ready. + waitDorisFeRunning(); try (Connection connection = getQueryConnection(); Statement statement = connection.createStatement()) { ResultSet resultSet; @@ -207,6 +256,58 @@ private void initializeJdbcConnection() throws Exception { LOG.info("Connected to Doris successfully."); } + /** + * Wait for Doris container to running. If the Doris FE not startup completely, try to connect + * it again until the doris FE is ready. + */ + private void waitDorisFeRunning() { + LOG.info("Waiting for Doris services to be accessible..."); + + // Poll Doris FE HTTP service every second with a maximum wait time of 5 minutes + // If the service is not available within this time, a timeout exception will be thrown + try { + Awaitility.await("FE HTTP Service") + .atMost(5, TimeUnit.MINUTES) + .pollInterval(1, TimeUnit.SECONDS) + .until( + () -> { + try { + ExecResult result = + dorisContainer.execInContainer( + "curl", + "-s", + "-o", + "/dev/null", + "-w", + "%{http_code}", + "-m", + "2", + "http://localhost:" + FE.HTTP_PORT); + boolean ready = result.getStdout().equals("200"); + LOG.info( + "FE HTTP service on port {} is ready: {}", + FE.HTTP_PORT, + ready); + + if (ready) { + LOG.info( + "FE HTTP service on port {} is ready", + FE.HTTP_PORT); + } + return ready; + } catch (Exception e) { + LOG.debug( + "Exception while checking FE HTTP service: {}", + e.getMessage()); + return false; + } + }); + + } catch (ConditionTimeoutException e) { + LOG.warn("Timed out after 5 minutes waiting for Doris services to be ready"); + } + } + private boolean isBeReady(ResultSet rs, Duration duration) throws SQLException { LockSupport.parkNanos(duration.toNanos()); if (rs.next()) { @@ -249,12 +350,12 @@ private void echo(String... cmd) { reader.close(); p.destroy(); } catch (Exception e) { - e.printStackTrace(); + LOG.info("Execute command in docker container failed. ", e); } } - private List convertList(ResultSet rs) throws SQLException { - List list = new ArrayList<>(); + private List> convertList(ResultSet rs) throws SQLException { + List> list = new ArrayList<>(); ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); while (rs.next()) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java index f41469902..107f549e2 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java @@ -129,7 +129,7 @@ public void cancel() {} mockSource.sinkTo(builder.build()); JobClient jobClient = env.executeAsync(); CompletableFuture jobStatus = jobClient.getJobStatus(); - LOG.info("Job status: {}", jobStatus); + LOG.info("batch Mode:{} Job status: {}", batchMode, jobStatus.get()); waitForJobStatus( jobClient, @@ -143,20 +143,20 @@ public void cancel() {} JobStatus.CANCELED, JobStatus.FAILED, JobStatus.RESTARTING); + LOG.info("batch Mode:{} wait job error status", batchMode); + waitForJobStatus(jobClient, errorStatus, Deadline.fromNow(Duration.ofSeconds(60))); - waitForJobStatus(jobClient, errorStatus, Deadline.fromNow(Duration.ofSeconds(30))); - - LOG.info("start to create add table"); + LOG.info("batch Mode:{} start to create add table", batchMode); initializeTable(TABLE_MULTI_CSV_NO_EXIST_TBL); - LOG.info("wait job restart success"); + LOG.info("batch Mode:{} wait job restart success", batchMode); // wait table restart success waitForJobStatus( jobClient, Collections.singletonList(RUNNING), Deadline.fromNow(Duration.ofSeconds(60))); - LOG.info("wait job running finished"); + LOG.info("batch Mode:{} wait job running finished", batchMode); waitForJobStatus( jobClient, Collections.singletonList(FINISHED), @@ -165,12 +165,14 @@ public void cancel() {} String queryRes = String.format( "select id,task_id from %s.%s ", DATABASE, TABLE_MULTI_CSV_NO_EXIST_TBL); - List expected = Arrays.asList("1,3"); + List expected = Collections.singletonList("1,3"); if (!batchMode) { + LOG.info("check stream mode result!"); ContainerUtils.checkResult( getDorisQueryConnection(), LOG, expected, queryRes, 2, false); } else { + LOG.info("check batch mode result!"); List actualResult = ContainerUtils.getResult(getDorisQueryConnection(), LOG, expected, queryRes, 2); LOG.info("actual size: {}, expected size: {}", actualResult.size(), expected.size()); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 73f6e03ac..e00220800 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -37,6 +37,7 @@ import org.apache.doris.flink.cfg.DorisStreamOptions; import org.apache.doris.flink.container.AbstractITCaseService; import org.apache.doris.flink.container.ContainerUtils; +import org.apache.doris.flink.container.config.DorisPorts.FE; import org.apache.doris.flink.datastream.DorisSourceFunction; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; import org.apache.doris.flink.table.DorisConfigOptions; @@ -92,7 +93,7 @@ public static Object[] parameters() { new Object[] { false, -1, }, - new Object[] {true, 9611} + new Object[] {true, FE.FLIGHT_SQL_PORT} }; } diff --git a/flink-doris-connector/src/test/resources/docker/doris/be.conf b/flink-doris-connector/src/test/resources/docker/doris/be.conf index 94b76e094..102f68049 100644 --- a/flink-doris-connector/src/test/resources/docker/doris/be.conf +++ b/flink-doris-connector/src/test/resources/docker/doris/be.conf @@ -19,13 +19,13 @@ CUR_DATE=`date +%Y%m%d-%H%M%S` PPROF_TMPDIR="$DORIS_HOME/log/" -JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" +JAVA_OPTS="-Xmx2048m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" # For jdk 9+, this JAVA_OPTS will be used as default JVM options -JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" +JAVA_OPTS_FOR_JDK_9="-Xmx2048m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" # For jdk 17+, this JAVA_OPTS will be used as default JVM options -JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED" +JAVA_OPTS_FOR_JDK_17="-Xmx2048m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED" # since 1.2, the JAVA_HOME need to be set to run BE process. # JAVA_HOME=/path/to/jdk/ @@ -43,7 +43,7 @@ be_port = 9060 webserver_port = 8040 heartbeat_service_port = 9050 brpc_port = 8060 -arrow_flight_sql_port = 9610 +arrow_flight_sql_port = 10040 enable_debug_points = true # HTTPS configures diff --git a/flink-doris-connector/src/test/resources/docker/doris/fe.conf b/flink-doris-connector/src/test/resources/docker/doris/fe.conf index a45fb5359..c88b86620 100644 --- a/flink-doris-connector/src/test/resources/docker/doris/fe.conf +++ b/flink-doris-connector/src/test/resources/docker/doris/fe.conf @@ -25,6 +25,8 @@ CUR_DATE=`date +%Y%m%d-%H%M%S` # Log dir LOG_DIR = ${DORIS_HOME}/log +# For jdk 8 +JAVA_OPTS="-Dfile.encoding=UTF-8 -Djavax.security.auth.useSubjectCredsOnly=false -Xss4m -Xmx8192m -XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCDateStamps -XX:+PrintGCDetails -Xloggc:$LOG_DIR/log/fe.gc.log.$CUR_DATE -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=50M -Dlog4j2.formatMsgNoLookups=true" # For jdk 17, this JAVA_OPTS will be used as default JVM options JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR -Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED" @@ -47,7 +49,7 @@ http_port = 8030 rpc_port = 9020 query_port = 9030 edit_log_port = 9010 -arrow_flight_sql_port = 9611 +arrow_flight_sql_port = 10030 enable_debug_points = true arrow_flight_token_cache_size = 50 # Choose one if there are more than one ip except loopback address.