Skip to content

future(provider): future:server support timeout interrupt #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public class ProviderConfig<T> implements Cloneable {
*/
@ConfigProperty(value = Constants.DEFAULT_SERVER_TIMEOUT_MS, type = Integer.class, override = true)
protected int requestTimeout;

/**
* Whether to enable execute timeout interrupt.
*/
@ConfigProperty(value="false",type=Boolean.class,override = true)
protected Boolean enableTimeoutInterrupt;

/**
* Thread pool configuration.
*/
Expand Down Expand Up @@ -288,6 +295,20 @@ public void setRequestTimeout(int requestTimeout) {
this.requestTimeout = requestTimeout;
}

public Boolean getEnableTimeoutInterrupt() {
checkFiledModifyPrivilege();
if(null != enableTimeoutInterrupt){
return enableTimeoutInterrupt;
}

return serviceConfig.getEnableTimeoutInterrupt();
}

public void setEnableTimeoutInterrupt(boolean enableTimeoutInterrupt) {
checkFiledModifyPrivilege();
this.enableTimeoutInterrupt = enableTimeoutInterrupt;
}

public String getWorkerPool() {
if (null != workerPool) {
return workerPool;
Expand Down Expand Up @@ -336,4 +357,6 @@ public void setEnableLinkTimeout(Boolean enableLinkTimeout) {
this.enableLinkTimeout = enableLinkTimeout;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public class ServerConfig {
*/
@ConfigProperty(value = "false", type = Boolean.class)
protected Boolean enableLinkTimeout;
/**
* Whether to enable execute timeout interrupt.
*/
@ConfigProperty(value="false",type=Boolean.class)
protected Boolean enableTimeoutInterrupt;
/**
* Whether to disable default filters:
* <p>{@link com.tencent.trpc.core.filter.ProviderInvokerHeadFilter}</p>
Expand Down Expand Up @@ -413,4 +418,13 @@ public void setRunListeners(List<String> runListeners) {
this.runListeners = runListeners;
}

public Boolean getEnableTimeoutInterrupt() {
return enableTimeoutInterrupt;
}

public void setEnableTimeoutInterrupt(Boolean enableTimeoutInterrupt) {
checkFiledModifyPrivilege();
this.enableTimeoutInterrupt = enableTimeoutInterrupt;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public class ServiceConfig extends BaseProtocolConfig {
*/
@ConfigProperty(value = "false", type = Boolean.class, override = true)
protected Boolean enableLinkTimeout;
/**
* Whether to enable execute timeout interrupt.
*/
@ConfigProperty(value="false",type=Boolean.class,override = true)
protected Boolean enableTimeoutInterrupt;

protected AtomicBoolean setDefault = new AtomicBoolean(Boolean.FALSE);
protected AtomicBoolean initialized = new AtomicBoolean(Boolean.FALSE);
Expand Down Expand Up @@ -623,4 +628,12 @@ public void setEnableLinkTimeout(Boolean enableLinkTimeout) {
this.enableLinkTimeout = enableLinkTimeout;
}

public Boolean getEnableTimeoutInterrupt() {
return enableTimeoutInterrupt;
}

public void setEnableTimeoutInterrupt(Boolean enableTimeoutInterrupt) {
checkFiledModifyPrivilege();
this.enableTimeoutInterrupt = enableTimeoutInterrupt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public DefProviderInvoker(ProtocolConfig config, ProviderConfig<T> pConfig) {
* @param costTime elapsed time in ms
* @return timeout in ms
*/
private LeftTimeout parseTimeout(final Request request, final long costTime) {
public LeftTimeout parseTimeout(final Request request, final long costTime) {
// The timeout set by the caller, minus the network time, queue waiting time, etc. to get the remaining time
int reqLeftTimeout = request.getMeta().getTimeout() - (int) costTime;
// The timeout set by the callee
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.tencent.trpc.core.management.PoolMXBean;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

/**
Expand All @@ -27,6 +28,8 @@ public interface WorkerPool {

void execute(Task task) throws RejectedExecutionException;

Future submit(Task task) throws RejectedExecutionException;

PoolMXBean report();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -112,6 +113,17 @@ public void execute(Task task) throws RejectedExecutionException {

}

@Override
public Future submit(Task task) throws RejectedExecutionException {
return forkJoinPool.submit(() -> {
try {
task.run();
} catch (Throwable ex) {
logger.error("submit task failure:", ex);
}
});
}

@Override
public PoolMXBean report() {
return forkJoinPoolMXBean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -205,6 +206,17 @@ public void execute(Task task) {
});
}

@Override
public Future submit(Task task) {
return threadPool.submit(() -> {
try {
task.run();
} catch (Throwable ex) {
logger.error("", ex);
}
});
}

@Override
public PoolMXBean report() {
return threadPoolMXBean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.tencent.trpc.core.management.ForkJoinPoolMXBeanImpl;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -45,6 +46,11 @@ public void execute(Task task) throws RejectedExecutionException {

}

@Override
public Future submit(Task task) throws RejectedExecutionException {
return null;
}

@Override
public ForkJoinPoolMXBeanImpl report() {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Tencent is pleased to support the open source community by making tRPC available.
*
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* All rights reserved.
*
* If you have downloaded a copy of the tRPC source code from Tencent,
Expand Down Expand Up @@ -35,6 +35,8 @@
import com.tencent.trpc.core.rpc.common.RpcMethodInfoAndInvoker;
import com.tencent.trpc.core.rpc.def.DecodableValue;
import com.tencent.trpc.core.rpc.def.DefMethodInfoRegister;
import com.tencent.trpc.core.rpc.def.DefProviderInvoker;
import com.tencent.trpc.core.rpc.def.LeftTimeout;
import com.tencent.trpc.core.transport.Channel;
import com.tencent.trpc.core.transport.ServerTransport;
import com.tencent.trpc.core.transport.codec.ServerCodec;
Expand All @@ -47,7 +49,10 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;

/**
Expand Down Expand Up @@ -233,13 +238,38 @@ private void handle(Channel channel, Request request) {
}
try {
ProviderInvoker<?> invoker = rpcMethodInfoAndInvoker.getInvoker();
invoker.getConfig().getWorkerPoolObj().execute(() -> {
final ProviderConfig<?> config = invoker.getConfig();
final long start = System.nanoTime();
Future<?> future = config.getWorkerPoolObj().submit(() -> {
try {
dispatch(channel, invoker, request);
} catch (Throwable ex) {
LOG.error("Dispatch request|" + request + " error", ex);
}
});

// if enable timeout interrupt and execute timeout, interrupt thread.
if (config.getTimeoutInterrupt()) {
int timeout = 0;
if (config.getEnableLinkTimeout() && invoker instanceof DefProviderInvoker) {
// link timeout need modify
long costTime = System.currentTimeMillis() - request.getMeta().getCreateTime();
LeftTimeout leftTimeout = ((DefProviderInvoker) invoker).parseTimeout(request, costTime);
timeout = leftTimeout.getLeftTimeout() - (int)((System.nanoTime() - start) / 1000000000);
} else {
timeout = config.getRequestTimeout() - (int)((System.nanoTime() - start) / 1000000000);
}
LOG.debug("Dispatch request timeout:{} ms", timeout);
try {
future.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.error("Dispatch request [" + request + "] timeout,interrupt thread.");
boolean cancelSuccess = future.cancel(true);
LOG.info("Interrupt thread success?{}", cancelSuccess);
} catch (Throwable t) {
throw t;
}
}
} catch (Throwable ex) {
LOG.error("Dispatch request [" + request + "] error", ex);
if (ex instanceof RejectedExecutionException) {
Expand Down Expand Up @@ -269,7 +299,7 @@ private RpcMethodInfoAndInvoker route(Request req, boolean ex) {
return route;
} else if (ex) {
throw TRpcException.newFrameException(ErrorCode.TRPC_SERVER_NOFUNC_ERR,
"Not found {func=%s}", invocation.getFunc());
"Not found {func=%s}", invocation.getFunc());
}
return null;
}
Expand All @@ -296,7 +326,7 @@ private void setInvocation(Request request, RpcMethodInfo methodInfo, MethodRout
if (null == request.getInvocation()) {
throw new IllegalArgumentException(
String.format("Server(%s), request(%s), Request invocation is null",
protocolConfig.toSimpleString(), requestToString(request)));
protocolConfig.toSimpleString(), requestToString(request)));
}
RpcInvocation invocation = request.getInvocation();
invocation.setRpcMethodInfo(methodInfo);
Expand Down Expand Up @@ -340,7 +370,9 @@ protected void dispatch(final Channel channel, ProviderInvoker<?> invoker, Reque
if (request.getMeta().isOneWay()) {
try {
invoke(invoker, request).whenComplete((r, t) ->
printException(request, t, "onewayInvoke exception"));
printException(request,
t,
"onewayInvoke exception"));
} catch (Throwable ex) {
printException(request, ex, "onewayInvoke exception");
}
Expand Down Expand Up @@ -387,8 +419,8 @@ private void reply(Channel channel, Request request, Response response) {
});
} else {
LOG.error("Found rpcServiceName={}, rpcMethodName={}, return value is <null>",
request.getInvocation().getRpcServiceName(),
request.getInvocation().getRpcMethodName());
request.getInvocation().getRpcServiceName(),
request.getInvocation().getRpcMethodName());
}
} else {
LOG.error(
Expand All @@ -410,7 +442,7 @@ private void errorReply(Channel channel, Request request, int errorCode, int biz
String msg) {
Response response =
RpcUtils.newResponse(request, null,
TRpcException.newException(errorCode, bizCode, msg));
TRpcException.newException(errorCode, bizCode, msg));
if (channel.isConnected()) {
channel.send(response).whenComplete((rx, tx) -> {
if (tx != null) {
Expand All @@ -419,7 +451,7 @@ private void errorReply(Channel channel, Request request, int errorCode, int biz
});
} else {
LOG.error("Request{" + requestToString(request) + "} reply error, channel={" + channel
+ "} is close or disconnect");
+ "} is close or disconnect");
}
}

Expand Down
Loading