Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class Stream implements GrpcReadWriteStream.Observer<SessionResponse> {

Stream(Rpc rpc) {
this.scheduler = rpc.getScheduler();
this.stream = rpc.createSession(GrpcRequestSettings.newBuilder().build());
this.stream = rpc.createSession(GrpcRequestSettings.newBuilder()
.disableDeadline()
.build());
}

public CompletableFuture<Status> startStream() {
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class GrpcRequestSettings {
private final long deadlineAfter;
private final Integer preferredNodeID;
private final boolean directMode;
private final boolean deadlineDisabled;
private final String traceId;
private final List<String> clientCapabilities;
private final Consumer<Metadata> trailersHandler;
Expand All @@ -27,6 +28,7 @@ private GrpcRequestSettings(Builder builder) {
this.deadlineAfter = builder.deadlineAfter;
this.preferredNodeID = builder.preferredNodeID;
this.directMode = builder.directMode;
this.deadlineDisabled = builder.deadlineDisabled;
this.traceId = builder.traceId;
this.clientCapabilities = builder.clientCapabilities;
this.trailersHandler = builder.trailersHandler;
Expand All @@ -42,6 +44,10 @@ public long getDeadlineAfter() {
return deadlineAfter;
}

public boolean isDeadlineDisabled() {
return deadlineDisabled;
}

public Integer getPreferredNodeID() {
return preferredNodeID;
}
Expand Down Expand Up @@ -72,6 +78,7 @@ public GrpcFlowControl getFlowControl() {

public static final class Builder {
private long deadlineAfter = 0L;
private boolean deadlineDisabled = false;
private Integer preferredNodeID = null;
private boolean directMode = false;
private String traceId = null;
Expand All @@ -91,6 +98,7 @@ public static final class Builder {
*/
public Builder withDeadlineAfter(long deadlineAfter) {
this.deadlineAfter = deadlineAfter;
this.deadlineDisabled = false;
return this;
}

Expand All @@ -106,6 +114,7 @@ public Builder withDeadline(Duration duration) {
} else {
this.deadlineAfter = 0L;
}
this.deadlineDisabled = false;
return this;
}

Expand Down Expand Up @@ -152,6 +161,11 @@ public Builder withPessimizationHook(BooleanSupplier pessimizationHook) {
return this;
}

public Builder disableDeadline() {
this.deadlineDisabled = true;
return this;
}

public GrpcRequestSettings build() {
return new GrpcRequestSettings(this);
}
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
}
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
}
if (settings.isDeadlineDisabled()) {
options = options.withDeadline(null);
}

try {
GrpcChannel channel = getChannel(settings);
Expand Down Expand Up @@ -126,6 +129,9 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
}
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
}
if (settings.isDeadlineDisabled()) {
options = options.withDeadline(null);
}

try {
GrpcChannel channel = getChannel(settings);
Expand Down Expand Up @@ -172,6 +178,9 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
}
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
}
if (settings.isDeadlineDisabled()) {
options = options.withDeadline(null);
}

try {
GrpcChannel channel = getChannel(settings);
Expand Down
2 changes: 1 addition & 1 deletion query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ GrpcReadStream<Status> attach(AttachSessionSettings settings) {
Context ctx = Context.ROOT.fork();
Context previous = ctx.attach();
try {
GrpcRequestSettings grpcSettings = makeOptions(settings).build();
GrpcRequestSettings grpcSettings = makeOptions(settings).disableDeadline().build();
GrpcReadStream<YdbQuery.SessionState> origin = rpc.attachSession(request, grpcSettings);
return new GrpcReadStream<Status>() {
@Override
Expand Down
23 changes: 12 additions & 11 deletions topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import tech.ydb.proto.topic.v1.TopicServiceGrpc;
import tech.ydb.topic.TopicRpc;


/**
* @author Nikolay Perfilov
*/
Expand Down Expand Up @@ -49,7 +48,7 @@ public CompletableFuture<Status> alterTopic(YdbTopic.AlterTopicRequest request,

@Override
public CompletableFuture<Result<YdbTopic.DescribeTopicResult>> describeTopic(YdbTopic.DescribeTopicRequest request,
GrpcRequestSettings settings) {
GrpcRequestSettings settings) {
return transport
.unaryCall(TopicServiceGrpc.getDescribeTopicMethod(), settings, request)
.thenApply(OperationBinder.bindSync(
Expand Down Expand Up @@ -84,7 +83,7 @@ public CompletableFuture<Status> commitOffset(YdbTopic.CommitOffsetRequest reque

@Override
public CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request,
GrpcRequestSettings settings) {
GrpcRequestSettings settings) {
return transport
.unaryCall(TopicServiceGrpc.getUpdateOffsetsInTransactionMethod(), settings, request)
.thenApply(OperationBinder.bindSync(YdbTopic.UpdateOffsetsInTransactionResponse::getOperation));
Expand All @@ -93,19 +92,21 @@ public CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffse
@Override
public GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient>
writeSession(String streamId) {
return transport.readWriteStreamCall(
TopicServiceGrpc.getStreamWriteMethod(),
GrpcRequestSettings.newBuilder().withTraceId(streamId).build()
);
GrpcRequestSettings settings = GrpcRequestSettings.newBuilder()
.withTraceId(streamId)
.disableDeadline()
.build();
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamWriteMethod(), settings);
}

@Override
public GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient>
readSession(String streamId) {
return transport.readWriteStreamCall(
TopicServiceGrpc.getStreamReadMethod(),
GrpcRequestSettings.newBuilder().withTraceId(streamId).build()
);
GrpcRequestSettings settings = GrpcRequestSettings.newBuilder()
.withTraceId(streamId)
.disableDeadline()
.build();
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamReadMethod(), settings);
}

@Override
Expand Down