diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java index 930bc2731..18263f7e5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java @@ -58,4 +58,6 @@ public interface ConfigurationOptions { String FLIGHT_SQL_PORT = "source.flight-sql-port"; Integer FLIGHT_SQL_PORT_DEFAULT = -1; + + Integer DEFAULT_SINK_SOCKET_TIMEOUT_MS = 9 * 60 * 1000; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 831a317ee..b1dcc5009 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.Properties; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DEFAULT_SINK_SOCKET_TIMEOUT_MS; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE; @@ -66,6 +67,7 @@ public class DorisExecutionOptions implements Serializable { private final boolean ignoreUpdateBefore; private final WriteMode writeMode; private final boolean ignoreCommitError; + private final int sinkSocketTimeoutMs; public DorisExecutionOptions( int checkInterval, @@ -85,7 +87,8 @@ public DorisExecutionOptions( boolean ignoreUpdateBefore, boolean force2PC, WriteMode writeMode, - boolean ignoreCommitError) { + boolean ignoreCommitError, + int sinkSocketTimeoutMs) { Preconditions.checkArgument(maxRetries >= 0); this.checkInterval = checkInterval; this.maxRetries = maxRetries; @@ -107,6 +110,8 @@ public DorisExecutionOptions( this.ignoreUpdateBefore = ignoreUpdateBefore; this.writeMode = writeMode; this.ignoreCommitError = ignoreCommitError; + + this.sinkSocketTimeoutMs = sinkSocketTimeoutMs; } public static Builder builder() { @@ -214,6 +219,10 @@ public boolean ignoreCommitError() { return ignoreCommitError; } + public int getSinkSocketTimeoutMs() { + return sinkSocketTimeoutMs; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -240,7 +249,8 @@ public boolean equals(Object o) { && Objects.equals(streamLoadProp, that.streamLoadProp) && Objects.equals(enableDelete, that.enableDelete) && Objects.equals(enable2PC, that.enable2PC) - && writeMode == that.writeMode; + && writeMode == that.writeMode + && sinkSocketTimeoutMs == that.sinkSocketTimeoutMs; } @Override @@ -263,7 +273,8 @@ public int hashCode() { enableBatchMode, ignoreUpdateBefore, writeMode, - ignoreCommitError); + ignoreCommitError, + sinkSocketTimeoutMs); } /** Builder of {@link DorisExecutionOptions}. */ @@ -292,6 +303,8 @@ public static class Builder { private WriteMode writeMode = WriteMode.STREAM_LOAD; private boolean ignoreCommitError = false; + private int sinkSocketTimeoutMs = DEFAULT_SINK_SOCKET_TIMEOUT_MS; + /** * Sets the checkInterval to check exception with the interval while loading, The default is * 0, disabling the checker thread. @@ -497,6 +510,17 @@ public Builder setIgnoreCommitError(boolean ignoreCommitError) { return this; } + /** + * Set http socket timeout, only effective in batch mode. + * + * @param sinkSocketTimeoutMs + * @return this DorisExecutionOptions.builder. + */ + public Builder setSinkSocketTimeoutMs(int sinkSocketTimeoutMs) { + this.sinkSocketTimeoutMs = sinkSocketTimeoutMs; + return this; + } + /** * Build the {@link DorisExecutionOptions}. * @@ -540,7 +564,8 @@ public DorisExecutionOptions build() { ignoreUpdateBefore, force2PC, writeMode, - ignoreCommitError); + ignoreCommitError, + sinkSocketTimeoutMs); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java index 38716826c..83edd21fe 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java @@ -17,8 +17,10 @@ package org.apache.doris.flink.sink; +import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.http.client.config.RequestConfig; +import org.apache.http.config.SocketConfig; import org.apache.http.impl.NoConnectionReuseStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; @@ -83,7 +85,7 @@ public CloseableHttpClient getHttpClient() { * * @return */ - public HttpClientBuilder getHttpClientBuilderForBatch() { + public HttpClientBuilder getHttpClientBuilderForBatch(DorisExecutionOptions executionOptions) { return HttpClients.custom() .setRedirectStrategy( new DefaultRedirectStrategy() { @@ -96,10 +98,10 @@ protected boolean isRedirectable(String method) { RequestConfig.custom() .setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectTimeout) - // todo: Need to be extracted to DorisExecutionOption - // default checkpoint timeout is 10min - .setSocketTimeout(9 * 60 * 1000) - .build()); + // default socket timeout 9min, checkpoint timeout default 10min + .setSocketTimeout(executionOptions.getSinkSocketTimeoutMs()) + .build()) + .setDefaultSocketConfig(SocketConfig.custom().setSoKeepAlive(true).build()); } public HttpClientBuilder getHttpClientBuilderForCopyBatch() { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 136c407eb..04cd45b7c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -167,12 +167,13 @@ public DorisBatchStreamLoad( 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), - new DefaultThreadFactory("streamload-executor"), + new DefaultThreadFactory("streamload-executor-" + subTaskId), new ThreadPoolExecutor.AbortPolicy()); this.started = new AtomicBoolean(true); this.loadExecutorService.execute(loadAsyncExecutor); this.subTaskId = subTaskId; - this.httpClientBuilder = new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch(); + this.httpClientBuilder = + new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch(executionOptions); } /** diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index ffd3ec92e..ac06b0589 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Properties; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DEFAULT_SINK_SOCKET_TIMEOUT_MS; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; @@ -294,6 +295,13 @@ public class DorisConfigOptions { "the flush interval mills, over this time, asynchronous threads will flush data. The " + "default value is 10s."); + public static final ConfigOption SINK_SOCKET_TIMEOUT = + ConfigOptions.key("sink.socket.timeout") + .durationType() + .defaultValue(Duration.ofMillis(DEFAULT_SINK_SOCKET_TIMEOUT_MS)) + .withDescription( + "the socket timeout for stream load, the default value is 9min, only effective in batch mode."); + public static final ConfigOption SINK_IGNORE_UPDATE_BEFORE = ConfigOptions.key("sink.ignore.update-before") .booleanType() diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 2559e1f04..8b6a280e4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -78,6 +78,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_SOCKET_TIMEOUT; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_WRITE_MODE; import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; @@ -162,6 +163,8 @@ public Set> optionalOptions() { options.add(USE_FLIGHT_SQL); options.add(FLIGHT_SQL_PORT); + + options.add(SINK_SOCKET_TIMEOUT); return options; } @@ -258,6 +261,7 @@ private DorisExecutionOptions getDorisExecutionOptions( builder.setBufferFlushMaxBytes( (int) readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes()); builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); + builder.setSinkSocketTimeoutMs((int) readableConfig.get(SINK_SOCKET_TIMEOUT).toMillis()); builder.setUseCache(readableConfig.get(SINK_USE_CACHE)); return builder.build(); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 0ebc48fc5..732cb27da 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -317,6 +317,7 @@ public void testTableBatch() throws Exception { + " 'sink.ignore.update-before' = 'false'," + " 'sink.enable.batch-mode' = '%s'," + " 'sink.enable-delete' = 'true'," + + " 'sink.socket.timeout' = '5m'," + " 'sink.flush.queue-size' = '2'," + " 'sink.buffer-flush.max-rows' = '10000'," + " 'sink.buffer-flush.max-bytes' = '10MB'," diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java index 7f6213f79..39a324e12 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java @@ -143,6 +143,7 @@ public void testDorisSinkProperties() { properties.put("sink.ignore.update-before", "true"); properties.put("sink.ignore.commit-error", "false"); properties.put("sink.parallelism", "1"); + properties.put("sink.socket.timeout", "9m"); DynamicTableSink actual = createTableSink(SCHEMA, properties); DorisOptions options = @@ -175,6 +176,7 @@ public void testDorisSinkProperties() { .setFlushQueueSize(2) .setUseCache(true) .setIgnoreCommitError(false) + .setSinkSocketTimeoutMs(9 * 60 * 1000) .build(); final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();