Skip to content

[improve] add socket timeout config #592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ public interface ConfigurationOptions {

String FLIGHT_SQL_PORT = "source.flight-sql-port";
Integer FLIGHT_SQL_PORT_DEFAULT = -1;

Integer DEFAULT_SINK_SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Objects;
import java.util.Properties;

import static org.apache.doris.flink.cfg.ConfigurationOptions.DEFAULT_SINK_SOCKET_TIMEOUT_MS;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class DorisExecutionOptions implements Serializable {
private final boolean ignoreUpdateBefore;
private final WriteMode writeMode;
private final boolean ignoreCommitError;
private final int sinkSocketTimeoutMs;

public DorisExecutionOptions(
int checkInterval,
Expand All @@ -85,7 +87,8 @@ public DorisExecutionOptions(
boolean ignoreUpdateBefore,
boolean force2PC,
WriteMode writeMode,
boolean ignoreCommitError) {
boolean ignoreCommitError,
int sinkSocketTimeoutMs) {
Preconditions.checkArgument(maxRetries >= 0);
this.checkInterval = checkInterval;
this.maxRetries = maxRetries;
Expand All @@ -107,6 +110,8 @@ public DorisExecutionOptions(
this.ignoreUpdateBefore = ignoreUpdateBefore;
this.writeMode = writeMode;
this.ignoreCommitError = ignoreCommitError;

this.sinkSocketTimeoutMs = sinkSocketTimeoutMs;
}

public static Builder builder() {
Expand Down Expand Up @@ -214,6 +219,10 @@ public boolean ignoreCommitError() {
return ignoreCommitError;
}

public int getSinkSocketTimeoutMs() {
return sinkSocketTimeoutMs;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -240,7 +249,8 @@ public boolean equals(Object o) {
&& Objects.equals(streamLoadProp, that.streamLoadProp)
&& Objects.equals(enableDelete, that.enableDelete)
&& Objects.equals(enable2PC, that.enable2PC)
&& writeMode == that.writeMode;
&& writeMode == that.writeMode
&& sinkSocketTimeoutMs == that.sinkSocketTimeoutMs;
}

@Override
Expand All @@ -263,7 +273,8 @@ public int hashCode() {
enableBatchMode,
ignoreUpdateBefore,
writeMode,
ignoreCommitError);
ignoreCommitError,
sinkSocketTimeoutMs);
}

/** Builder of {@link DorisExecutionOptions}. */
Expand Down Expand Up @@ -292,6 +303,8 @@ public static class Builder {
private WriteMode writeMode = WriteMode.STREAM_LOAD;
private boolean ignoreCommitError = false;

private int sinkSocketTimeoutMs = DEFAULT_SINK_SOCKET_TIMEOUT_MS;

/**
* Sets the checkInterval to check exception with the interval while loading, The default is
* 0, disabling the checker thread.
Expand Down Expand Up @@ -497,6 +510,17 @@ public Builder setIgnoreCommitError(boolean ignoreCommitError) {
return this;
}

/**
* Set http socket timeout, only effective in batch mode.
*
* @param sinkSocketTimeoutMs
* @return this DorisExecutionOptions.builder.
*/
public Builder setSinkSocketTimeoutMs(int sinkSocketTimeoutMs) {
this.sinkSocketTimeoutMs = sinkSocketTimeoutMs;
return this;
}

/**
* Build the {@link DorisExecutionOptions}.
*
Expand Down Expand Up @@ -540,7 +564,8 @@ public DorisExecutionOptions build() {
ignoreUpdateBefore,
force2PC,
writeMode,
ignoreCommitError);
ignoreCommitError,
sinkSocketTimeoutMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.doris.flink.sink;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.SocketConfig;
import org.apache.http.impl.NoConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
Expand Down Expand Up @@ -83,7 +85,7 @@ public CloseableHttpClient getHttpClient() {
*
* @return
*/
public HttpClientBuilder getHttpClientBuilderForBatch() {
public HttpClientBuilder getHttpClientBuilderForBatch(DorisExecutionOptions executionOptions) {
return HttpClients.custom()
.setRedirectStrategy(
new DefaultRedirectStrategy() {
Expand All @@ -96,10 +98,10 @@ protected boolean isRedirectable(String method) {
RequestConfig.custom()
.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectTimeout)
// todo: Need to be extracted to DorisExecutionOption
// default checkpoint timeout is 10min
.setSocketTimeout(9 * 60 * 1000)
.build());
// default socket timeout 9min, checkpoint timeout default 10min
.setSocketTimeout(executionOptions.getSinkSocketTimeoutMs())
.build())
.setDefaultSocketConfig(SocketConfig.custom().setSoKeepAlive(true).build());
}

public HttpClientBuilder getHttpClientBuilderForCopyBatch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,13 @@ public DorisBatchStreamLoad(
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new DefaultThreadFactory("streamload-executor"),
new DefaultThreadFactory("streamload-executor-" + subTaskId),
new ThreadPoolExecutor.AbortPolicy());
this.started = new AtomicBoolean(true);
this.loadExecutorService.execute(loadAsyncExecutor);
this.subTaskId = subTaskId;
this.httpClientBuilder = new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch();
this.httpClientBuilder =
new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch(executionOptions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.doris.flink.cfg.ConfigurationOptions.DEFAULT_SINK_SOCKET_TIMEOUT_MS;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
Expand Down Expand Up @@ -294,6 +295,13 @@ public class DorisConfigOptions {
"the flush interval mills, over this time, asynchronous threads will flush data. The "
+ "default value is 10s.");

public static final ConfigOption<Duration> SINK_SOCKET_TIMEOUT =
ConfigOptions.key("sink.socket.timeout")
.durationType()
.defaultValue(Duration.ofMillis(DEFAULT_SINK_SOCKET_TIMEOUT_MS))
.withDescription(
"the socket timeout for stream load, the default value is 9min, only effective in batch mode.");

public static final ConfigOption<Boolean> SINK_IGNORE_UPDATE_BEFORE =
ConfigOptions.key("sink.ignore.update-before")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_SOCKET_TIMEOUT;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_WRITE_MODE;
import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
Expand Down Expand Up @@ -162,6 +163,8 @@ public Set<ConfigOption<?>> optionalOptions() {

options.add(USE_FLIGHT_SQL);
options.add(FLIGHT_SQL_PORT);

options.add(SINK_SOCKET_TIMEOUT);
return options;
}

Expand Down Expand Up @@ -258,6 +261,7 @@ private DorisExecutionOptions getDorisExecutionOptions(
builder.setBufferFlushMaxBytes(
(int) readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes());
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.setSinkSocketTimeoutMs((int) readableConfig.get(SINK_SOCKET_TIMEOUT).toMillis());
builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public void testTableBatch() throws Exception {
+ " 'sink.ignore.update-before' = 'false',"
+ " 'sink.enable.batch-mode' = '%s',"
+ " 'sink.enable-delete' = 'true',"
+ " 'sink.socket.timeout' = '5m',"
+ " 'sink.flush.queue-size' = '2',"
+ " 'sink.buffer-flush.max-rows' = '10000',"
+ " 'sink.buffer-flush.max-bytes' = '10MB',"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public void testDorisSinkProperties() {
properties.put("sink.ignore.update-before", "true");
properties.put("sink.ignore.commit-error", "false");
properties.put("sink.parallelism", "1");
properties.put("sink.socket.timeout", "9m");

DynamicTableSink actual = createTableSink(SCHEMA, properties);
DorisOptions options =
Expand Down Expand Up @@ -175,6 +176,7 @@ public void testDorisSinkProperties() {
.setFlushQueueSize(2)
.setUseCache(true)
.setIgnoreCommitError(false)
.setSinkSocketTimeoutMs(9 * 60 * 1000)
.build();

final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
Expand Down