Skip to content

Commit 35b39e9

Browse files
committed
Added option to disable grpc deadline for long streams
1 parent f313956 commit 35b39e9

File tree

5 files changed

+39
-13
lines changed

5 files changed

+39
-13
lines changed

coordination/src/main/java/tech/ydb/coordination/impl/Stream.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ class Stream implements GrpcReadWriteStream.Observer<SessionResponse> {
4040

4141
Stream(Rpc rpc) {
4242
this.scheduler = rpc.getScheduler();
43-
this.stream = rpc.createSession(GrpcRequestSettings.newBuilder().build());
43+
this.stream = rpc.createSession(GrpcRequestSettings.newBuilder()
44+
.disableDeadline()
45+
.build());
4446
}
4547

4648
public CompletableFuture<Status> startStream() {

core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class GrpcRequestSettings {
1717
private final long deadlineAfter;
1818
private final Integer preferredNodeID;
1919
private final boolean directMode;
20+
private final boolean deadlineDisabled;
2021
private final String traceId;
2122
private final List<String> clientCapabilities;
2223
private final Consumer<Metadata> trailersHandler;
@@ -27,6 +28,7 @@ private GrpcRequestSettings(Builder builder) {
2728
this.deadlineAfter = builder.deadlineAfter;
2829
this.preferredNodeID = builder.preferredNodeID;
2930
this.directMode = builder.directMode;
31+
this.deadlineDisabled = builder.deadlineDisabled;
3032
this.traceId = builder.traceId;
3133
this.clientCapabilities = builder.clientCapabilities;
3234
this.trailersHandler = builder.trailersHandler;
@@ -42,6 +44,10 @@ public long getDeadlineAfter() {
4244
return deadlineAfter;
4345
}
4446

47+
public boolean isDeadlineDisabled() {
48+
return deadlineDisabled;
49+
}
50+
4551
public Integer getPreferredNodeID() {
4652
return preferredNodeID;
4753
}
@@ -72,6 +78,7 @@ public GrpcFlowControl getFlowControl() {
7278

7379
public static final class Builder {
7480
private long deadlineAfter = 0L;
81+
private boolean deadlineDisabled = false;
7582
private Integer preferredNodeID = null;
7683
private boolean directMode = false;
7784
private String traceId = null;
@@ -91,6 +98,7 @@ public static final class Builder {
9198
*/
9299
public Builder withDeadlineAfter(long deadlineAfter) {
93100
this.deadlineAfter = deadlineAfter;
101+
this.deadlineDisabled = false;
94102
return this;
95103
}
96104

@@ -106,6 +114,7 @@ public Builder withDeadline(Duration duration) {
106114
} else {
107115
this.deadlineAfter = 0L;
108116
}
117+
this.deadlineDisabled = false;
109118
return this;
110119
}
111120

@@ -152,6 +161,11 @@ public Builder withPessimizationHook(BooleanSupplier pessimizationHook) {
152161
return this;
153162
}
154163

164+
public Builder disableDeadline() {
165+
this.deadlineDisabled = true;
166+
return this;
167+
}
168+
155169
public GrpcRequestSettings build() {
156170
return new GrpcRequestSettings(this);
157171
}

core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
8383
}
8484
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
8585
}
86+
if (settings.isDeadlineDisabled()) {
87+
options = options.withDeadline(null);
88+
}
8689

8790
try {
8891
GrpcChannel channel = getChannel(settings);
@@ -126,6 +129,9 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
126129
}
127130
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
128131
}
132+
if (settings.isDeadlineDisabled()) {
133+
options = options.withDeadline(null);
134+
}
129135

130136
try {
131137
GrpcChannel channel = getChannel(settings);
@@ -172,6 +178,9 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
172178
}
173179
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
174180
}
181+
if (settings.isDeadlineDisabled()) {
182+
options = options.withDeadline(null);
183+
}
175184

176185
try {
177186
GrpcChannel channel = getChannel(settings);

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ GrpcReadStream<Status> attach(AttachSessionSettings settings) {
142142
Context ctx = Context.ROOT.fork();
143143
Context previous = ctx.attach();
144144
try {
145-
GrpcRequestSettings grpcSettings = makeOptions(settings).build();
145+
GrpcRequestSettings grpcSettings = makeOptions(settings).disableDeadline().build();
146146
GrpcReadStream<YdbQuery.SessionState> origin = rpc.attachSession(request, grpcSettings);
147147
return new GrpcReadStream<Status>() {
148148
@Override

topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import tech.ydb.proto.topic.v1.TopicServiceGrpc;
1717
import tech.ydb.topic.TopicRpc;
1818

19-
2019
/**
2120
* @author Nikolay Perfilov
2221
*/
@@ -49,7 +48,7 @@ public CompletableFuture<Status> alterTopic(YdbTopic.AlterTopicRequest request,
4948

5049
@Override
5150
public CompletableFuture<Result<YdbTopic.DescribeTopicResult>> describeTopic(YdbTopic.DescribeTopicRequest request,
52-
GrpcRequestSettings settings) {
51+
GrpcRequestSettings settings) {
5352
return transport
5453
.unaryCall(TopicServiceGrpc.getDescribeTopicMethod(), settings, request)
5554
.thenApply(OperationBinder.bindSync(
@@ -84,7 +83,7 @@ public CompletableFuture<Status> commitOffset(YdbTopic.CommitOffsetRequest reque
8483

8584
@Override
8685
public CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request,
87-
GrpcRequestSettings settings) {
86+
GrpcRequestSettings settings) {
8887
return transport
8988
.unaryCall(TopicServiceGrpc.getUpdateOffsetsInTransactionMethod(), settings, request)
9089
.thenApply(OperationBinder.bindSync(YdbTopic.UpdateOffsetsInTransactionResponse::getOperation));
@@ -93,19 +92,21 @@ public CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffse
9392
@Override
9493
public GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient>
9594
writeSession(String streamId) {
96-
return transport.readWriteStreamCall(
97-
TopicServiceGrpc.getStreamWriteMethod(),
98-
GrpcRequestSettings.newBuilder().withTraceId(streamId).build()
99-
);
95+
GrpcRequestSettings settings = GrpcRequestSettings.newBuilder()
96+
.withTraceId(streamId)
97+
.disableDeadline()
98+
.build();
99+
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamWriteMethod(), settings);
100100
}
101101

102102
@Override
103103
public GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient>
104104
readSession(String streamId) {
105-
return transport.readWriteStreamCall(
106-
TopicServiceGrpc.getStreamReadMethod(),
107-
GrpcRequestSettings.newBuilder().withTraceId(streamId).build()
108-
);
105+
GrpcRequestSettings settings = GrpcRequestSettings.newBuilder()
106+
.withTraceId(streamId)
107+
.disableDeadline()
108+
.build();
109+
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamReadMethod(), settings);
109110
}
110111

111112
@Override

0 commit comments

Comments
 (0)