From 281143b411ce537aa64038d3d5f73cf2e2b54c36 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Wed, 5 Nov 2025 13:01:04 +0300 Subject: [PATCH 1/3] First example of script API --- .../tech/ydb/query/script/ScriptClient.java | 51 +++ .../java/tech/ydb/query/script/ScriptRpc.java | 21 + .../query/script/impl/ScriptClientImpl.java | 149 +++++++ .../ydb/query/script/impl/ScriptRpcImpl.java | 89 +++++ .../script/result/FetchScriptResult.java | 37 ++ .../query/script/result/OperationScript.java | 75 ++++ .../settings/ExecuteScriptSettings.java | 155 +++++++ .../script/settings/FetchScriptSettings.java | 112 ++++++ .../script/settings/FindScriptSettings.java | 21 + .../java/tech/ydb/query/TestExampleData.java | 2 +- .../ydb/query/impl/ScriptExampleTest.java | 378 ++++++++++++++++++ 11 files changed, 1089 insertions(+), 1 deletion(-) create mode 100644 query/src/main/java/tech/ydb/query/script/ScriptClient.java create mode 100644 query/src/main/java/tech/ydb/query/script/ScriptRpc.java create mode 100644 query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java create mode 100644 query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java create mode 100644 query/src/main/java/tech/ydb/query/script/result/FetchScriptResult.java create mode 100644 query/src/main/java/tech/ydb/query/script/result/OperationScript.java create mode 100644 query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java create mode 100644 query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java create mode 100644 query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java create mode 100644 query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java diff --git a/query/src/main/java/tech/ydb/query/script/ScriptClient.java b/query/src/main/java/tech/ydb/query/script/ScriptClient.java new file mode 100644 index 000000000..03a39b91b --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/ScriptClient.java @@ -0,0 +1,51 @@ +package tech.ydb.query.script; + +import tech.ydb.core.Status; +import tech.ydb.query.script.result.OperationScript; +import tech.ydb.query.script.result.FetchScriptResult; +import tech.ydb.query.script.settings.ExecuteScriptSettings; +import tech.ydb.query.script.settings.FetchScriptSettings; +import tech.ydb.query.script.settings.FindScriptSettings; +import tech.ydb.table.query.Params; + + +import java.util.concurrent.CompletableFuture; + +public interface ScriptClient { + + /** + * Find script + * + * @param operationId + * @param settings + * @return + */ + CompletableFuture findScript(String operationId, FindScriptSettings settings); + + /** + * Start script and get entity for operation + * @param query + * @param params + * @param settings + * @return + */ + CompletableFuture startScript(String query, + Params params, + ExecuteScriptSettings settings); + + /** + * Wait for script execution and just give result + * + * @param query + * @param params + * @param settings + * @return + */ + Status startJoinScript(String query, + Params params, + ExecuteScriptSettings settings); + + CompletableFuture fetchScriptResults( + FetchScriptSettings settings); + +} diff --git a/query/src/main/java/tech/ydb/query/script/ScriptRpc.java b/query/src/main/java/tech/ydb/query/script/ScriptRpc.java new file mode 100644 index 000000000..36ec89c7c --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/ScriptRpc.java @@ -0,0 +1,21 @@ +package tech.ydb.query.script; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.operation.Operation; +import tech.ydb.proto.OperationProtos; +import tech.ydb.proto.query.YdbQuery; + +import java.util.concurrent.CompletableFuture; + +public interface ScriptRpc { + + CompletableFuture> getOperation(String operationId); + + CompletableFuture> executeScript( + YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings); + + CompletableFuture> fetchScriptResults( + YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings); +} diff --git a/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java b/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java new file mode 100644 index 000000000..d0906eccd --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java @@ -0,0 +1,149 @@ +package tech.ydb.query.script.impl; + +import com.google.common.base.Strings; +import com.google.protobuf.Duration; + +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.settings.BaseRequestSettings; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.query.script.ScriptClient; +import tech.ydb.query.script.ScriptRpc; +import tech.ydb.query.script.result.FetchScriptResult; +import tech.ydb.query.script.result.OperationScript; +import tech.ydb.query.script.settings.ExecuteScriptSettings; +import tech.ydb.query.script.settings.FindScriptSettings; +import tech.ydb.query.script.settings.FetchScriptSettings; +import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.settings.QueryStatsMode; +import tech.ydb.table.query.Params; + + +import javax.annotation.WillNotClose; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +public class ScriptClientImpl implements ScriptClient { + + private final ScriptRpc scriptRpc; + + ScriptClientImpl(ScriptRpc scriptRpc) { + this.scriptRpc = scriptRpc; + } + + public static ScriptClient newClient(@WillNotClose GrpcTransport transport) { + return new ScriptClientImpl(ScriptRpcImpl.useTransport(transport)); + } + + @Override + public CompletableFuture findScript(String operationId, FindScriptSettings settings) { + return scriptRpc.getOperation(operationId).thenApply(OperationScript::new); + } + + @Override + public Status startJoinScript(String query, + Params params, + ExecuteScriptSettings settings) { + return this.startScript(query, params, settings).join().waitForResult(); + } + + @Override + public CompletableFuture startScript(String query, + Params params, + ExecuteScriptSettings settings) { + YdbQuery.ExecuteScriptRequest.Builder request = YdbQuery.ExecuteScriptRequest.newBuilder() + .setExecMode(mapExecMode(settings.getExecMode())) + .setStatsMode(mapStatsMode(settings.getStatsMode())) + .setScriptContent(YdbQuery.QueryContent.newBuilder() + .setSyntax(YdbQuery.Syntax.SYNTAX_YQL_V1) + .setText(query) + .build()); + + java.time.Duration ttl = settings.getTtl(); + if (ttl != null) { + request.setResultsTtl(Duration.newBuilder().setNanos(settings.getTtl().getNano())); + } + + String resourcePool = settings.getResourcePool(); + if (resourcePool != null && !resourcePool.isEmpty()) { + request.setPoolId(resourcePool); + } + + request.putAllParameters(params.toPb()); + + GrpcRequestSettings options = makeGrpcRequestSettings(settings); + + return scriptRpc.executeScript(request.build(), options) + .thenApply(OperationScript::new); + } + + @Override + public CompletableFuture fetchScriptResults( + FetchScriptSettings settings) { + YdbQuery.FetchScriptResultsRequest.Builder requestBuilder = YdbQuery.FetchScriptResultsRequest.newBuilder(); + + if (!Strings.isNullOrEmpty(settings.getFetchToken())) { + requestBuilder.setFetchToken(settings.getFetchToken()); + } + + if (settings.getRowsLimit() > 0) { + requestBuilder.setRowsLimit(settings.getRowsLimit()); + } + + requestBuilder.setOperationId(settings.getOperationId()); + + if (settings.getSetResultSetIndex() >= 0) { + requestBuilder.setResultSetIndex(settings.getSetResultSetIndex()); + } + + GrpcRequestSettings options = makeGrpcRequestSettings(settings); + + return scriptRpc.fetchScriptResults(requestBuilder.build(), options) + .thenApply(p -> new FetchScriptResult(p.getValue())); + } + + private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) { + String traceId = settings.getTraceId() == null ? UUID.randomUUID().toString() : settings.getTraceId(); + return GrpcRequestSettings.newBuilder() + .withDeadline(settings.getRequestTimeout()) + .withTraceId(traceId) + .build(); + } + + private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) { + switch (mode) { + case EXECUTE: + return YdbQuery.ExecMode.EXEC_MODE_EXECUTE; + case EXPLAIN: + return YdbQuery.ExecMode.EXEC_MODE_EXPLAIN; + case PARSE: + return YdbQuery.ExecMode.EXEC_MODE_PARSE; + case VALIDATE: + return YdbQuery.ExecMode.EXEC_MODE_VALIDATE; + + case UNSPECIFIED: + default: + return YdbQuery.ExecMode.EXEC_MODE_UNSPECIFIED; + } + } + + private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) { + switch (mode) { + case NONE: + return YdbQuery.StatsMode.STATS_MODE_NONE; + case BASIC: + return YdbQuery.StatsMode.STATS_MODE_BASIC; + case FULL: + return YdbQuery.StatsMode.STATS_MODE_FULL; + case PROFILE: + return YdbQuery.StatsMode.STATS_MODE_PROFILE; + + case UNSPECIFIED: + default: + return YdbQuery.StatsMode.STATS_MODE_UNSPECIFIED; + } + } + +} diff --git a/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java b/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java new file mode 100644 index 000000000..bdc6e1ce9 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java @@ -0,0 +1,89 @@ +package tech.ydb.query.script.impl; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationBinder; +import tech.ydb.core.operation.StatusExtractor; +import tech.ydb.proto.OperationProtos; +import tech.ydb.proto.operation.v1.OperationServiceGrpc; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.proto.query.v1.QueryServiceGrpc; +import tech.ydb.query.script.ScriptRpc; + +import javax.annotation.WillNotClose; + +import java.util.concurrent.CompletableFuture; + +public class ScriptRpcImpl implements ScriptRpc { + + private static final StatusExtractor FETCH_SCRIPT = StatusExtractor.of( + YdbQuery.FetchScriptResultsResponse::getStatus, + YdbQuery.FetchScriptResultsResponse::getIssuesList + ); + + private final GrpcTransport transport; + + private ScriptRpcImpl(GrpcTransport grpcTransport) { + this.transport = grpcTransport; + } + + public static ScriptRpcImpl useTransport(@WillNotClose GrpcTransport grpcTransport) { + return new ScriptRpcImpl(grpcTransport); + } + + @Override + public CompletableFuture> getOperation(String operationId) { + OperationProtos.GetOperationRequest request = OperationProtos.GetOperationRequest.newBuilder() + .setId(operationId) + .build(); + + GrpcRequestSettings settings = GrpcRequestSettings.newBuilder().build(); + + return transport + .unaryCall(OperationServiceGrpc.getGetOperationMethod(), settings, request) + .thenApply( + OperationBinder.bindAsync(transport, + OperationProtos.GetOperationResponse::getOperation + )); + } + + /** + * Executes a YQL script using the Query service API. + * + * + * @param request the {@link YdbQuery.ExecuteScriptRequest} containing the script + * @param settings gRPC request settings + * @return a future resolving to an {@link Operation} representing the script execution + */ + @Override + public CompletableFuture> executeScript( + YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings) { + + return transport.unaryCall(QueryServiceGrpc.getExecuteScriptMethod(), settings, request) + .thenApply( + OperationBinder.bindAsync(transport, + op -> op + )); + } + + /** + * Fetches the results of a previously executed script. + * + *

This method retrieves the next portion of script execution results, + * supporting pagination and partial fetch using tokens.

+ * + * @param request the {@link YdbQuery.FetchScriptResultsRequest} specifying the fetch parameters + * @param settings gRPC request settings + * @return a future resolving to {@link Result} containing {@link YdbQuery.FetchScriptResultsResponse} + */ + @Override + public CompletableFuture> fetchScriptResults( + YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings) { + + return transport + .unaryCall(QueryServiceGrpc.getFetchScriptResultsMethod(), settings, request); + } +} diff --git a/query/src/main/java/tech/ydb/query/script/result/FetchScriptResult.java b/query/src/main/java/tech/ydb/query/script/result/FetchScriptResult.java new file mode 100644 index 000000000..353834eb4 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/result/FetchScriptResult.java @@ -0,0 +1,37 @@ +package tech.ydb.query.script.result; + +import tech.ydb.core.Result; +import tech.ydb.proto.StatusCodesProtos; +import tech.ydb.proto.ValueProtos; +import tech.ydb.proto.YdbIssueMessage; +import tech.ydb.proto.query.YdbQuery; + +import java.util.List; + +public class FetchScriptResult { + final YdbQuery.FetchScriptResultsResponse resultsResponse; + + public FetchScriptResult(YdbQuery.FetchScriptResultsResponse value) { + resultsResponse = value; + } + + public ValueProtos.ResultSet getResultSet(){ + return resultsResponse.getResultSet(); + } + + public StatusCodesProtos.StatusIds.StatusCode getStatusCode() { + return resultsResponse.getStatus(); + } + + public List getIssuesList() { + return resultsResponse.getIssuesList(); + } + + public String getNextFetchToken() { + return resultsResponse.getNextFetchToken(); + } + + public long getResultSetIndex() { + return resultsResponse.getResultSetIndex(); + } +} diff --git a/query/src/main/java/tech/ydb/query/script/result/OperationScript.java b/query/src/main/java/tech/ydb/query/script/result/OperationScript.java new file mode 100644 index 000000000..a48b6cc8c --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/result/OperationScript.java @@ -0,0 +1,75 @@ +package tech.ydb.query.script.result; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * + */ +public class OperationScript implements Operation { + + private final Operation operation; + private volatile CompletableFuture futureResultOfScriptExecution; + + public OperationScript(Operation resultOperation) { + this.operation = resultOperation; + } + + public CompletableFuture getStatus() { + if (futureResultOfScriptExecution == null) { + synchronized (this) { + if (futureResultOfScriptExecution == null) { + futureResultOfScriptExecution = OperationTray.fetchOperation( + operation, 1); + } + } + } + return futureResultOfScriptExecution; + } + + public Status waitForResult() { + return getStatus().join(); + } + + public String getId() { + return operation.getId(); + } + + public boolean isReady() { + return operation.isReady(); + } + + @Nullable + @Override + public Status getValue() { + return operation.getValue(); + } + + @Override + public CompletableFuture cancel() { + return operation.cancel(); + } + + @Override + public CompletableFuture forget() { + return operation.forget(); + } + + @Override + public CompletableFuture> fetch() { + return operation.fetch(); + } + + @Override + public Operation transform(Function mapper) { + return null; + } + +} diff --git a/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java new file mode 100644 index 000000000..77c3b11c6 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java @@ -0,0 +1,155 @@ +package tech.ydb.query.script.settings; + +import java.time.Duration; + +import tech.ydb.core.settings.BaseRequestSettings; +import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.settings.QueryStatsMode; + +/** + * Settings for configuring script execution requests. + *

+ * Used by {@code QuerySession.executeScript(...)} and similar APIs. + * + *

Author: Evgeny Kuvardin + */ +public class ExecuteScriptSettings extends BaseRequestSettings { + private final QueryExecMode execMode; + private final QueryStatsMode statsMode; + private final String resourcePool; + private final Duration ttl; + + private ExecuteScriptSettings(Builder builder) { + super(builder); + this.execMode = builder.execMode; + this.statsMode = builder.statsMode; + this.ttl = builder.ttl; + this.resourcePool = builder.resourcePool; + } + + /** + * Returns the execution mode for the script. + * + *

Defines how the script should be processed, e.g. executed, explained, validated, or parsed.

+ * + * @return the {@link QueryExecMode} used for execution + */ + public QueryExecMode getExecMode() { + return this.execMode; + } + + /** + * Returns the time-to-live (TTL) duration for the script results. + * + *

Specifies how long results of the executed script will be kept available + * before automatic cleanup on the server.

+ * + * @return the TTL value, or {@code null} if not set + */ + public Duration getTtl() { + return ttl; + } + + /** + * Returns the statistics collection mode for script execution. + * + *

Determines how detailed execution statistics should be gathered + * (none, basic, full, or profiling level).

+ * + * @return the {@link QueryStatsMode} used for statistics collection + */ + public QueryStatsMode getStatsMode() { + return this.statsMode; + } + + /** + * Returns the name of the resource pool assigned to the script execution. + * + *

Resource pools define isolated resource groups for workload management. + * If not specified, the default pool is used.

+ * + * @return the resource pool name, or {@code null} if not set + */ + public String getResourcePool() { + return this.resourcePool; + } + + + /** + * Creates a new {@link Builder} instance for constructing {@link ExecuteScriptSettings}. + * + * @return a new builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating immutable {@link ExecuteScriptSettings} instances. + *

+ * Provides fluent configuration for script execution settings + */ + public static class Builder extends BaseBuilder { + private QueryExecMode execMode = QueryExecMode.EXECUTE; + private QueryStatsMode statsMode = QueryStatsMode.NONE; + private String resourcePool = null; + private Duration ttl = null; + + /** + * Sets the execution mode for the script. + * + * @param mode the desired execution mode + * @return this builder instance for chaining + * @see QueryExecMode + */ + public Builder withExecMode(QueryExecMode mode) { + this.execMode = mode; + return this; + } + + /** + * Sets the statistics collection mode for the script execution. + * + * @param mode the desired statistics mode + * @return this builder instance for chaining + * @see QueryStatsMode + */ + public Builder withStatsMode(QueryStatsMode mode) { + this.statsMode = mode; + return this; + } + + /** + * Sets the time-to-live (TTL) duration for script results. + * + *

After this duration expires, stored script results may be deleted + * from the server automatically.

+ * + * @param value the TTL duration + * @return this builder instance for chaining + */ + public Builder withTtl(Duration value) { + this.ttl = value; + return this; + } + + /** + * Specifies the resource pool to use for query execution. + *

+ * If no pool is specified, or the ID is empty, or equal to {@code "default"}, + * the unremovable resource pool "default" will be used. + * + * @param poolId resource pool identifier + * @return this builder instance for chaining + */ + public Builder withResourcePool(String poolId) { + this.resourcePool = poolId; + return this; + } + + @Override + public ExecuteScriptSettings build() { + return new ExecuteScriptSettings(this); + } + } +} diff --git a/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java new file mode 100644 index 000000000..8ed711880 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java @@ -0,0 +1,112 @@ +package tech.ydb.query.script.settings; + +import tech.ydb.core.settings.BaseRequestSettings; + +/** + * Settings for configuring the fetch phase of a previously executed YQL script. + *

+ * These settings define which operation results to fetch, pagination options, + * row limits, and which result set index to retrieve. + * Used with {@code QuerySession.fetchScriptResults(...)} and similar APIs. + * + *

Author: Evgeny Kuvardin + */ +public class FetchScriptSettings extends BaseRequestSettings { + private final String operationId; + private final String fetchToken; + private final int rowsLimit; + private final int setResultSetIndex; + + private FetchScriptSettings(Builder builder) { + super(builder); + this.operationId = builder.operationId; + this.fetchToken = builder.fetchToken; + this.rowsLimit = builder.rowsLimit; + this.setResultSetIndex = builder.setResultSetIndex; + } + + /** + * Returns the identifier of the operation whose results should be fetched. + * + *

This ID corresponds to the operation returned by + * {@code QuerySession.executeScript(...)} or a similar asynchronous call.

+ * + * @return the operation ID string + */ + public String getOperationId() { + return operationId; + } + + /** + * Returns the fetch token used to continue fetching paginated results. + * + *

When a previous fetch request indicates more data is available, + * this token can be used to retrieve the next portion of results.

+ * + * @return the fetch token, or an empty string if not set + */ + public String getFetchToken() { + return fetchToken; + } + + /** + * Returns the maximum number of rows to retrieve in this fetch request. + * + *

If not set , the server will use its default limit.

+ * + * @return the maximum number of rows to fetch + */ + public int getRowsLimit() { + return rowsLimit; + } + + /** + * Returns the index of the result set to fetch from the executed script. + * + *

When the executed script produces multiple result sets, + * this value specifies which one to retrieve (starting from 0).

+ * + * @return the result set index + */ + public int getSetResultSetIndex() { + return setResultSetIndex; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends BaseBuilder { + + private int rowsLimit = 0; + private int setResultSetIndex = 0; + private String operationId = ""; + private String fetchToken = ""; + + @Override + public FetchScriptSettings build() { + return new FetchScriptSettings(this); + } + + public Builder withEOperationId(String operationId) { + this.operationId = operationId; + return this; + } + + public Builder withFetchToken(String fetchToken) { + this.fetchToken = fetchToken; + return this; + } + + public Builder withRowsLimit(int rowsLimit) { + this.rowsLimit = rowsLimit; + return this; + } + + public Builder withSetResultSetIndex(int setResultSetIndex) { + this.setResultSetIndex = setResultSetIndex; + return this; + } + + } +} diff --git a/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java new file mode 100644 index 000000000..a600211ae --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java @@ -0,0 +1,21 @@ +package tech.ydb.query.script.settings; + +import tech.ydb.core.settings.OperationSettings; + +public class FindScriptSettings extends OperationSettings { + + private FindScriptSettings(Builder builder) { + super(builder); + } + + public static Builder newBuilder() { + return new Builder().withAsyncMode(true); + } + + public static class Builder extends OperationSettings.OperationBuilder { + @Override + public FindScriptSettings build() { + return new FindScriptSettings(this); + } + } +} diff --git a/query/src/test/java/tech/ydb/query/TestExampleData.java b/query/src/test/java/tech/ydb/query/TestExampleData.java index 3c7684d5f..14a3d6def 100644 --- a/query/src/test/java/tech/ydb/query/TestExampleData.java +++ b/query/src/test/java/tech/ydb/query/TestExampleData.java @@ -4,7 +4,7 @@ import java.util.Arrays; import java.util.List; -final class TestExampleData { +public final class TestExampleData { public static class Series { private final long seriesID; private final String title; diff --git a/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java b/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java new file mode 100644 index 000000000..958424af6 --- /dev/null +++ b/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java @@ -0,0 +1,378 @@ +package tech.ydb.query.impl; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.proto.ValueProtos; +import tech.ydb.query.QueryClient; +import tech.ydb.query.TestExampleData; +import tech.ydb.query.script.ScriptClient; +import tech.ydb.query.script.result.OperationScript; +import tech.ydb.query.script.impl.ScriptClientImpl; +import tech.ydb.query.script.result.FetchScriptResult; +import tech.ydb.query.script.settings.ExecuteScriptSettings; +import tech.ydb.query.script.settings.FetchScriptSettings; +import tech.ydb.query.script.settings.FindScriptSettings; +import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.tools.QueryReader; +import tech.ydb.query.tools.SessionRetryContext; +import tech.ydb.table.query.Params; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.result.impl.ProtoValueReaders; +import tech.ydb.table.values.ListType; +import tech.ydb.table.values.ListValue; +import tech.ydb.table.values.PrimitiveType; +import tech.ydb.table.values.PrimitiveValue; +import tech.ydb.table.values.StructType; +import tech.ydb.test.junit4.GrpcTransportRule; + + +/** + * Integration tests that validate the execution of YQL scripts + * using the YDB Query API and scripting features. + * + *

Tests cover: + *

    + *
  • Script execution with and without parameters
  • + *
  • Error handling in scripts
  • + *
  • Sequential script execution
  • + *
  • Fetching results from executed scripts
  • + *
+ * + *

Author: Evgeny Kuvardin + */ +public class ScriptExampleTest { + + @ClassRule + public final static GrpcTransportRule ydbRule = new GrpcTransportRule(); + + private static QueryClient client; + private static SessionRetryContext retryCtx; + private static ScriptClient scriptClient; + + // Create type for struct of series + StructType seriesType = StructType.of( + "series_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "release_date", PrimitiveType.Date, + "series_info", PrimitiveType.Text + ); + // Create and fill list of series + ListValue seriesData = ListType.of(seriesType).newValue( + TestExampleData.SERIES.stream().map(series -> seriesType.newValue( + "series_id", PrimitiveValue.newUint64(series.seriesID()), + "title", PrimitiveValue.newText(series.title()), + "series_info", PrimitiveValue.newText(series.seriesInfo()), + "release_date", PrimitiveValue.newDate(series.releaseDate()) + )).collect(Collectors.toList()) + ); + + // Create type for struct of season + StructType seasonType = StructType.of( + "series_id", PrimitiveType.Uint64, + "season_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "first_aired", PrimitiveType.Date, + "last_aired", PrimitiveType.Date + ); + // Create and fill list of seasons + ListValue seasonsData = ListType.of(seasonType).newValue( + TestExampleData.SEASONS.stream().map(season -> seasonType.newValue( + "series_id", PrimitiveValue.newUint64(season.seriesID()), + "season_id", PrimitiveValue.newUint64(season.seasonID()), + "title", PrimitiveValue.newText(season.title()), + "first_aired", PrimitiveValue.newDate(season.firstAired()), + "last_aired", PrimitiveValue.newDate(season.lastAired()) + )).collect(Collectors.toList()) + ); + + @BeforeClass + public static void init() { + client = QueryClient.newClient(ydbRule) + .sessionPoolMaxSize(5) + .build(); + retryCtx = SessionRetryContext.create(client).build(); + + scriptClient = ScriptClientImpl.newClient(ydbRule); + + Assert.assertNotNull(client.getScheduler()); + } + + @After + public void clean() { + retryCtx.supplyResult(session -> session.createQuery("DROP TABLE series;", TxMode.NONE).execute()) + .join(); + retryCtx.supplyResult(session -> session.createQuery("DROP TABLE seasons;", TxMode.NONE).execute()) + .join(); + } + + @AfterClass + public static void cleanAll() { + client.close(); + } + + @Test + public void createScript() { + Status status = runCreateSuccessScript(); + Assert.assertTrue(status.isSuccess()); + + String query + = "SELECT series_id, title, release_date " + + "FROM series WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + QueryReader result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join().getValue(); + + ResultSetReader rs = result.getResultSet(0); + + // Check that table exists and contains no data + Assert.assertFalse(rs.next()); + } + + /** + * Ensures that script execution fails when it contains syntax errors. + *

+ * Attempts to execute a malformed YQL script and verifies that the result + * indicates failure. + */ + @Test + public void createScriptShouldFail() { + Status statusOperation = runCreateScript("CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "ZCREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")"); + + Assert.assertFalse(statusOperation.isSuccess()); + + String query + = "SELECT series_id, title, release_date " + + "FROM series WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + + // Check that table exists and contains no data + Assert.assertFalse(result.isSuccess()); + } + + /** + * Validates sequential script execution using QueryClient.executeScript. + *

+ * Creates tables, then inserts data in a separate script execution, and + * verifies data persistence. + */ + @Test + public void createInsertQueryScript() { + runCreateSuccessScript(); + + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .withTtl(Duration.ofSeconds(10)) + .build(); + + Status status = scriptClient.startJoinScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings); + + Assert.assertTrue(status.isSuccess()); + + String query + = "SELECT series_id " + + "FROM seasons WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + ResultSetReader rs = result.getValue().getResultSet(0); + + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getColumn("series_id").getUint64()); + } + + /** + * Validate that find script is working + *

+ * In this test we start script then try it to find and wait for finish execution + */ + @Test + public void findAndStartScript() { + runCreateSuccessScript(); + + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .build(); + + CompletableFuture future = scriptClient.startScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_id = 1 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings); + + OperationScript operationScript = future.join(); + + + OperationScript operationScript1 = scriptClient.findScript(operationScript.getId(), FindScriptSettings.newBuilder().build()).join(); + + + Assert.assertEquals(operationScript1.getId(), operationScript.getId()); + + Status status = operationScript1.waitForResult(); + Assert.assertTrue(status.isSuccess()); + } + + /** + * Tests fetching results from an executed script using {@link FetchScriptSettings}. + * + *

Scenario: + *

    + *
  1. Create tables
  2. + *
  3. Insert sample data via parameterized script
  4. + *
  5. Fetch the result set from the executed operation
  6. + *
+ * + * @throws ExecutionException if the script future fails + * @throws InterruptedException if the fetch operation is interrupted + */ + @Test + public void fetchScript() throws ExecutionException, InterruptedException { + runCreateSuccessScript(); + + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .build(); + + CompletableFuture future = scriptClient.startScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_id = 1 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings); + + OperationScript operationScript = future.join(); + Status status = operationScript.waitForResult(); + + + FetchScriptSettings fetchScriptSettings1 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(0) + .withEOperationId(operationScript.getId()) + .withFetchToken("") + .build(); + + FetchScriptResult fetchScriptResult = checkFetch(fetchScriptSettings1, 1); + + FetchScriptSettings fetchScriptSettings2 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(0) + .withEOperationId(operationScript.getId()) + .withFetchToken(fetchScriptResult.getNextFetchToken()) + .build(); + + checkFetch(fetchScriptSettings2, 2); + } + + private FetchScriptResult checkFetch(FetchScriptSettings fetchScriptSettings, int value) { + FetchScriptResult fetchScriptResult = scriptClient.fetchScriptResults(fetchScriptSettings).join(); + + ValueProtos.ResultSet resultSet = fetchScriptResult.getResultSet(); + Assert.assertEquals(1, resultSet.getRowsCount()); + + ResultSetReader reader = ProtoValueReaders.forResultSet(resultSet); + reader.next(); + Assert.assertEquals(value, reader.getColumn(0).getUint64()); + return fetchScriptResult; + } + + private Status runCreateSuccessScript() { + return runCreateScript("" + + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "" + + "CREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")"); + } + + private Status runCreateScript(String query) { + return scriptClient.startJoinScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); + } +} From 51257f2de24eff5746bf524e9ef5aff890155e14 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Mon, 17 Nov 2025 23:22:03 +0300 Subject: [PATCH 2/3] Change api + add comments --- .../tech/ydb/query/script/ScriptClient.java | 96 ++++-- .../java/tech/ydb/query/script/ScriptRpc.java | 29 +- .../query/script/impl/ScriptClientImpl.java | 42 ++- .../ydb/query/script/impl/ScriptRpcImpl.java | 37 +-- .../script/result/FetchScriptResult.java | 37 --- .../query/script/result/OperationScript.java | 75 ----- .../query/script/result/ScriptResultPart.java | 40 +++ .../script/settings/FetchScriptSettings.java | 48 +-- .../ydb/query/impl/ScriptExampleTest.java | 283 +++++++++++------- 9 files changed, 352 insertions(+), 335 deletions(-) delete mode 100644 query/src/main/java/tech/ydb/query/script/result/FetchScriptResult.java delete mode 100644 query/src/main/java/tech/ydb/query/script/result/OperationScript.java create mode 100644 query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java diff --git a/query/src/main/java/tech/ydb/query/script/ScriptClient.java b/query/src/main/java/tech/ydb/query/script/ScriptClient.java index 03a39b91b..a509fb43c 100644 --- a/query/src/main/java/tech/ydb/query/script/ScriptClient.java +++ b/query/src/main/java/tech/ydb/query/script/ScriptClient.java @@ -1,51 +1,97 @@ package tech.ydb.query.script; +import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.query.script.result.OperationScript; -import tech.ydb.query.script.result.FetchScriptResult; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; +import tech.ydb.query.script.result.ScriptResultPart; import tech.ydb.query.script.settings.ExecuteScriptSettings; import tech.ydb.query.script.settings.FetchScriptSettings; import tech.ydb.query.script.settings.FindScriptSettings; import tech.ydb.table.query.Params; +import javax.annotation.Nullable; + import java.util.concurrent.CompletableFuture; +/** + * High-level API for executing YQL scripts and retrieving their results. + *

+ * Provides convenience methods for starting script execution, tracking operation status, + * and fetching result sets with pagination support. + *

+ * How to use + *

    + *
  • startQueryScript - starting script execution or findQueryScript if script had already started
  • + *
  • fetchQueryScriptStatus - wait for script execution
  • + *
  • fetchQueryScriptResult - fetch script result if necessary
  • + *
+ *

+ * Example with fetch + *

+ *

    + *
  • Operation operation = scriptClient.startQueryScript("select...",Params.of(...), executeScriptSettings).join()
  • + *
  • Status status = scriptClient.fetchQueryScriptStatus(operation, 1).join()
  • + *
  • Result resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings).join()
  • + *
  • ResultSetReader reader = scriptResultPart.getResultSetReader()
  • + *
  • reader.next()
  • + *
+ *

+ * + * * Example without fetch + * *

+ * *

    + * *
  • Status status = + * scriptClient.startQueryScript("select...",Params.of(...), executeScriptSettings) + * .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1)) + * .join()
  • + * *
+ * *

+ */ public interface ScriptClient { /** - * Find script + * Returns operation metadata for a previously started script execution. * - * @param operationId - * @param settings - * @return + * @param operationId operation identifier + * @param settings request settings + * @return future resolving to operation status */ - CompletableFuture findScript(String operationId, FindScriptSettings settings); + CompletableFuture> findQueryScript(String operationId, FindScriptSettings settings); /** - * Start script and get entity for operation - * @param query - * @param params - * @param settings - * @return + * Starts execution of the given YQL script with optional parameters. + * + * @param query YQL script text + * @param params query parameters + * @param settings execution settings (TTL, resource pool, exec mode) + * @return future resolving to a long-running operation */ - CompletableFuture startScript(String query, - Params params, - ExecuteScriptSettings settings); + CompletableFuture> startQueryScript(String query, + Params params, + ExecuteScriptSettings settings); /** - * Wait for script execution and just give result + * Wait for script execution and return status * - * @param query - * @param params - * @param settings - * @return + * @param operation operation object returned when script started + * @param fetchRateSeconds How often should we check if the operation has finished + * @return future with result of script execution */ - Status startJoinScript(String query, - Params params, - ExecuteScriptSettings settings); + default CompletableFuture fetchQueryScriptStatus(Operation operation, int fetchRateSeconds) { + return OperationTray.fetchOperation(operation, fetchRateSeconds); + } - CompletableFuture fetchScriptResults( - FetchScriptSettings settings); + /** + * Fetches script results incrementally. + * + * @param operation operation object returned when script started + * @param previous previous result part, or {@code null} if fetching from start + * @param settings fetch configuration + * @return future resolving to result part containing a result set fragment + */ + CompletableFuture> fetchQueryScriptResult(Operation operation, + @Nullable ScriptResultPart previous, FetchScriptSettings settings); } diff --git a/query/src/main/java/tech/ydb/query/script/ScriptRpc.java b/query/src/main/java/tech/ydb/query/script/ScriptRpc.java index 36ec89c7c..72723325d 100644 --- a/query/src/main/java/tech/ydb/query/script/ScriptRpc.java +++ b/query/src/main/java/tech/ydb/query/script/ScriptRpc.java @@ -4,18 +4,45 @@ import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.operation.Operation; -import tech.ydb.proto.OperationProtos; import tech.ydb.proto.query.YdbQuery; import java.util.concurrent.CompletableFuture; +/** + * Low-level RPC interface for executing YQL scripts and fetching their results using gRPC. + *

+ * Provides direct bindings to the YDB QueryService API + * Used internally by {@link tech.ydb.query.script.ScriptClient} implementations. + * + *

Author: Evgeny Kuvardin + */ public interface ScriptRpc { + /** + * Retrieves a previously created operation by its ID. + * + * @param operationId ID of the operation to fetch + * @return future resolving to the operation metadata and status + */ CompletableFuture> getOperation(String operationId); + /** + * Executes a script as a long-running operation. + * + * @param request execution request describing the script and execution mode {@link YdbQuery.ExecuteScriptRequest} + * @param settings RPC request settings including timeout, trace ID, etc. + * @return future resolving to an {@link Operation} representing the script execution + */ CompletableFuture> executeScript( YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings); + /** + * Fetches partial results for a previously executed script. + * + * @param request fetch request including token, result set index, etc. {@link YdbQuery.FetchScriptResultsRequest} + * @param settings RPC settings for this request + * @return future resolving to the result fetch response {@link Result} of {@link YdbQuery.FetchScriptResultsResponse} + */ CompletableFuture> fetchScriptResults( YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings); } diff --git a/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java b/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java index d0906eccd..e12054bad 100644 --- a/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java +++ b/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java @@ -1,17 +1,17 @@ package tech.ydb.query.script.impl; -import com.google.common.base.Strings; import com.google.protobuf.Duration; +import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.operation.Operation; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.proto.query.YdbQuery; import tech.ydb.query.script.ScriptClient; import tech.ydb.query.script.ScriptRpc; -import tech.ydb.query.script.result.FetchScriptResult; -import tech.ydb.query.script.result.OperationScript; +import tech.ydb.query.script.result.ScriptResultPart; import tech.ydb.query.script.settings.ExecuteScriptSettings; import tech.ydb.query.script.settings.FindScriptSettings; import tech.ydb.query.script.settings.FetchScriptSettings; @@ -19,7 +19,8 @@ import tech.ydb.query.settings.QueryStatsMode; import tech.ydb.table.query.Params; - +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.WillNotClose; import java.util.UUID; @@ -38,21 +39,14 @@ public static ScriptClient newClient(@WillNotClose GrpcTransport transport) { } @Override - public CompletableFuture findScript(String operationId, FindScriptSettings settings) { - return scriptRpc.getOperation(operationId).thenApply(OperationScript::new); + public CompletableFuture> findQueryScript(String operationId, FindScriptSettings settings) { + return scriptRpc.getOperation(operationId); } @Override - public Status startJoinScript(String query, - Params params, - ExecuteScriptSettings settings) { - return this.startScript(query, params, settings).join().waitForResult(); - } - - @Override - public CompletableFuture startScript(String query, - Params params, - ExecuteScriptSettings settings) { + public CompletableFuture> startQueryScript(String query, + Params params, + ExecuteScriptSettings settings) { YdbQuery.ExecuteScriptRequest.Builder request = YdbQuery.ExecuteScriptRequest.newBuilder() .setExecMode(mapExecMode(settings.getExecMode())) .setStatsMode(mapStatsMode(settings.getStatsMode())) @@ -75,24 +69,23 @@ public CompletableFuture startScript(String query, GrpcRequestSettings options = makeGrpcRequestSettings(settings); - return scriptRpc.executeScript(request.build(), options) - .thenApply(OperationScript::new); + return scriptRpc.executeScript(request.build(), options); } @Override - public CompletableFuture fetchScriptResults( - FetchScriptSettings settings) { + public CompletableFuture> fetchQueryScriptResult(@Nonnull Operation operation, + @Nullable ScriptResultPart previous, FetchScriptSettings settings) { YdbQuery.FetchScriptResultsRequest.Builder requestBuilder = YdbQuery.FetchScriptResultsRequest.newBuilder(); - if (!Strings.isNullOrEmpty(settings.getFetchToken())) { - requestBuilder.setFetchToken(settings.getFetchToken()); + if (previous != null && previous.getNextFetchToken() != null) { + requestBuilder.setFetchToken(previous.getNextFetchToken()); } if (settings.getRowsLimit() > 0) { requestBuilder.setRowsLimit(settings.getRowsLimit()); } - requestBuilder.setOperationId(settings.getOperationId()); + requestBuilder.setOperationId(operation.getId()); if (settings.getSetResultSetIndex() >= 0) { requestBuilder.setResultSetIndex(settings.getSetResultSetIndex()); @@ -101,7 +94,7 @@ public CompletableFuture fetchScriptResults( GrpcRequestSettings options = makeGrpcRequestSettings(settings); return scriptRpc.fetchScriptResults(requestBuilder.build(), options) - .thenApply(p -> new FetchScriptResult(p.getValue())); + .thenApply(p -> p.map(ScriptResultPart::new)); } private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) { @@ -145,5 +138,4 @@ private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) { return YdbQuery.StatsMode.STATS_MODE_UNSPECIFIED; } } - } diff --git a/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java b/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java index bdc6e1ce9..a1a0bb50b 100644 --- a/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java +++ b/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java @@ -6,7 +6,6 @@ import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.operation.Operation; import tech.ydb.core.operation.OperationBinder; -import tech.ydb.core.operation.StatusExtractor; import tech.ydb.proto.OperationProtos; import tech.ydb.proto.operation.v1.OperationServiceGrpc; import tech.ydb.proto.query.YdbQuery; @@ -17,12 +16,14 @@ import java.util.concurrent.CompletableFuture; -public class ScriptRpcImpl implements ScriptRpc { +/** + * Default gRPC-based implementation of {@link ScriptRpc}. + *

+ * Uses {@link GrpcTransport} to communicate with YDB QueryService and OperationService. + * Provides async unary calls for executing scripts and retrieving results or operation metadata. + */ - private static final StatusExtractor FETCH_SCRIPT = StatusExtractor.of( - YdbQuery.FetchScriptResultsResponse::getStatus, - YdbQuery.FetchScriptResultsResponse::getIssuesList - ); +public class ScriptRpcImpl implements ScriptRpc { private final GrpcTransport transport; @@ -30,6 +31,12 @@ private ScriptRpcImpl(GrpcTransport grpcTransport) { this.transport = grpcTransport; } + /** + * Creates a new RPC instance bound to the given gRPC transport. + * + * @param grpcTransport transport instance (not closed by this class) + * @return new {@link ScriptRpcImpl} instance + */ public static ScriptRpcImpl useTransport(@WillNotClose GrpcTransport grpcTransport) { return new ScriptRpcImpl(grpcTransport); } @@ -50,14 +57,6 @@ public CompletableFuture> getOperation(String operationId) { )); } - /** - * Executes a YQL script using the Query service API. - * - * - * @param request the {@link YdbQuery.ExecuteScriptRequest} containing the script - * @param settings gRPC request settings - * @return a future resolving to an {@link Operation} representing the script execution - */ @Override public CompletableFuture> executeScript( YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings) { @@ -69,16 +68,6 @@ public CompletableFuture> executeScript( )); } - /** - * Fetches the results of a previously executed script. - * - *

This method retrieves the next portion of script execution results, - * supporting pagination and partial fetch using tokens.

- * - * @param request the {@link YdbQuery.FetchScriptResultsRequest} specifying the fetch parameters - * @param settings gRPC request settings - * @return a future resolving to {@link Result} containing {@link YdbQuery.FetchScriptResultsResponse} - */ @Override public CompletableFuture> fetchScriptResults( YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings) { diff --git a/query/src/main/java/tech/ydb/query/script/result/FetchScriptResult.java b/query/src/main/java/tech/ydb/query/script/result/FetchScriptResult.java deleted file mode 100644 index 353834eb4..000000000 --- a/query/src/main/java/tech/ydb/query/script/result/FetchScriptResult.java +++ /dev/null @@ -1,37 +0,0 @@ -package tech.ydb.query.script.result; - -import tech.ydb.core.Result; -import tech.ydb.proto.StatusCodesProtos; -import tech.ydb.proto.ValueProtos; -import tech.ydb.proto.YdbIssueMessage; -import tech.ydb.proto.query.YdbQuery; - -import java.util.List; - -public class FetchScriptResult { - final YdbQuery.FetchScriptResultsResponse resultsResponse; - - public FetchScriptResult(YdbQuery.FetchScriptResultsResponse value) { - resultsResponse = value; - } - - public ValueProtos.ResultSet getResultSet(){ - return resultsResponse.getResultSet(); - } - - public StatusCodesProtos.StatusIds.StatusCode getStatusCode() { - return resultsResponse.getStatus(); - } - - public List getIssuesList() { - return resultsResponse.getIssuesList(); - } - - public String getNextFetchToken() { - return resultsResponse.getNextFetchToken(); - } - - public long getResultSetIndex() { - return resultsResponse.getResultSetIndex(); - } -} diff --git a/query/src/main/java/tech/ydb/query/script/result/OperationScript.java b/query/src/main/java/tech/ydb/query/script/result/OperationScript.java deleted file mode 100644 index a48b6cc8c..000000000 --- a/query/src/main/java/tech/ydb/query/script/result/OperationScript.java +++ /dev/null @@ -1,75 +0,0 @@ -package tech.ydb.query.script.result; - -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.core.operation.Operation; -import tech.ydb.core.operation.OperationTray; - -import javax.annotation.Nullable; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -/** - * - */ -public class OperationScript implements Operation { - - private final Operation operation; - private volatile CompletableFuture futureResultOfScriptExecution; - - public OperationScript(Operation resultOperation) { - this.operation = resultOperation; - } - - public CompletableFuture getStatus() { - if (futureResultOfScriptExecution == null) { - synchronized (this) { - if (futureResultOfScriptExecution == null) { - futureResultOfScriptExecution = OperationTray.fetchOperation( - operation, 1); - } - } - } - return futureResultOfScriptExecution; - } - - public Status waitForResult() { - return getStatus().join(); - } - - public String getId() { - return operation.getId(); - } - - public boolean isReady() { - return operation.isReady(); - } - - @Nullable - @Override - public Status getValue() { - return operation.getValue(); - } - - @Override - public CompletableFuture cancel() { - return operation.cancel(); - } - - @Override - public CompletableFuture forget() { - return operation.forget(); - } - - @Override - public CompletableFuture> fetch() { - return operation.fetch(); - } - - @Override - public Operation transform(Function mapper) { - return null; - } - -} diff --git a/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java b/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java new file mode 100644 index 000000000..adfd5cbce --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java @@ -0,0 +1,40 @@ +package tech.ydb.query.script.result; + +import tech.ydb.core.Issue; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.result.impl.ProtoValueReaders; + +public class ScriptResultPart { + private final ResultSetReader resultSetReader; + private final long resultSetIndex; + private final String nextFetchToken; + private final Issue[] issues; + + public ScriptResultPart(YdbQuery.FetchScriptResultsResponse value) { + this.resultSetReader = ProtoValueReaders.forResultSet(value.getResultSet()); + this.resultSetIndex = value.getResultSetIndex(); + this.nextFetchToken = value.getNextFetchToken(); + this.issues = Issue.fromPb(value.getIssuesList()); + } + + public ResultSetReader getResultSetReader() { + return resultSetReader; + } + + public String getNextFetchToken() { + return nextFetchToken; + } + + public long getResultSetIndex() { + return resultSetIndex; + } + + public boolean hasErrors() { + return issues.length > 0; + } + + public Issue[] getIssues() { + return issues; + } +} diff --git a/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java index 8ed711880..60c204680 100644 --- a/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java +++ b/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java @@ -12,43 +12,15 @@ *

Author: Evgeny Kuvardin */ public class FetchScriptSettings extends BaseRequestSettings { - private final String operationId; - private final String fetchToken; private final int rowsLimit; - private final int setResultSetIndex; + private final long setResultSetIndex; private FetchScriptSettings(Builder builder) { super(builder); - this.operationId = builder.operationId; - this.fetchToken = builder.fetchToken; this.rowsLimit = builder.rowsLimit; this.setResultSetIndex = builder.setResultSetIndex; } - /** - * Returns the identifier of the operation whose results should be fetched. - * - *

This ID corresponds to the operation returned by - * {@code QuerySession.executeScript(...)} or a similar asynchronous call.

- * - * @return the operation ID string - */ - public String getOperationId() { - return operationId; - } - - /** - * Returns the fetch token used to continue fetching paginated results. - * - *

When a previous fetch request indicates more data is available, - * this token can be used to retrieve the next portion of results.

- * - * @return the fetch token, or an empty string if not set - */ - public String getFetchToken() { - return fetchToken; - } - /** * Returns the maximum number of rows to retrieve in this fetch request. * @@ -68,7 +40,7 @@ public int getRowsLimit() { * * @return the result set index */ - public int getSetResultSetIndex() { + public long getSetResultSetIndex() { return setResultSetIndex; } @@ -79,31 +51,19 @@ public static Builder newBuilder() { public static class Builder extends BaseBuilder { private int rowsLimit = 0; - private int setResultSetIndex = 0; - private String operationId = ""; - private String fetchToken = ""; + private long setResultSetIndex = 0; @Override public FetchScriptSettings build() { return new FetchScriptSettings(this); } - public Builder withEOperationId(String operationId) { - this.operationId = operationId; - return this; - } - - public Builder withFetchToken(String fetchToken) { - this.fetchToken = fetchToken; - return this; - } - public Builder withRowsLimit(int rowsLimit) { this.rowsLimit = rowsLimit; return this; } - public Builder withSetResultSetIndex(int setResultSetIndex) { + public Builder withSetResultSetIndex(long setResultSetIndex) { this.setResultSetIndex = setResultSetIndex; return this; } diff --git a/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java b/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java index 958424af6..d92890dac 100644 --- a/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java +++ b/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java @@ -1,8 +1,9 @@ package tech.ydb.query.impl; import java.time.Duration; -import java.util.concurrent.CompletableFuture; +import java.util.Arrays; import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.junit.After; @@ -15,13 +16,12 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.proto.ValueProtos; +import tech.ydb.core.operation.Operation; import tech.ydb.query.QueryClient; import tech.ydb.query.TestExampleData; import tech.ydb.query.script.ScriptClient; -import tech.ydb.query.script.result.OperationScript; import tech.ydb.query.script.impl.ScriptClientImpl; -import tech.ydb.query.script.result.FetchScriptResult; +import tech.ydb.query.script.result.ScriptResultPart; import tech.ydb.query.script.settings.ExecuteScriptSettings; import tech.ydb.query.script.settings.FetchScriptSettings; import tech.ydb.query.script.settings.FindScriptSettings; @@ -30,7 +30,6 @@ import tech.ydb.query.tools.SessionRetryContext; import tech.ydb.table.query.Params; import tech.ydb.table.result.ResultSetReader; -import tech.ydb.table.result.impl.ProtoValueReaders; import tech.ydb.table.values.ListType; import tech.ydb.table.values.ListValue; import tech.ydb.table.values.PrimitiveType; @@ -108,39 +107,45 @@ public static void init() { scriptClient = ScriptClientImpl.newClient(ydbRule); Assert.assertNotNull(client.getScheduler()); + + retryCtx.supplyResult(session -> session.createQuery("" + + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ")", TxMode.NONE).execute() + ).join().getStatus().expectSuccess("Can't create table series"); + + retryCtx.supplyResult(session -> session.createQuery("" + + "CREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")", TxMode.NONE).execute() + ).join().getStatus().expectSuccess("Can't create table seasons"); } @After public void clean() { - retryCtx.supplyResult(session -> session.createQuery("DROP TABLE series;", TxMode.NONE).execute()) + retryCtx.supplyResult(session -> session.createQuery("delete from series;", TxMode.NONE).execute()) .join(); - retryCtx.supplyResult(session -> session.createQuery("DROP TABLE seasons;", TxMode.NONE).execute()) + retryCtx.supplyResult(session -> session.createQuery("delete from seasons;", TxMode.NONE).execute()) .join(); } @AfterClass public static void cleanAll() { - client.close(); - } - - @Test - public void createScript() { - Status status = runCreateSuccessScript(); - Assert.assertTrue(status.isSuccess()); - - String query - = "SELECT series_id, title, release_date " - + "FROM series WHERE series_id = 1"; - - // Executes data query with specified transaction control settings. - QueryReader result = retryCtx.supplyResult( - session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) - ).join().getValue(); - - ResultSetReader rs = result.getResultSet(0); + retryCtx.supplyResult(session -> session.createQuery("drop table series;", TxMode.NONE).execute()) + .join(); + retryCtx.supplyResult(session -> session.createQuery("drop table seasons;", TxMode.NONE).execute()) + .join(); - // Check that table exists and contains no data - Assert.assertFalse(rs.next()); + client.close(); } /** @@ -151,14 +156,14 @@ public void createScript() { */ @Test public void createScriptShouldFail() { - Status statusOperation = runCreateScript("CREATE TABLE series (" + Status statusOperation = runCreateScript("CREATE TABLE series2 (" + " series_id UInt64," + " title Text," + " series_info Text," + " release_date Date," + " PRIMARY KEY(series_id)" + ");" - + "ZCREATE TABLE seasons (" + + "ZCREATE TABLE seasons2 (" + " series_id UInt64," + " season_id UInt64," + " title Text," @@ -171,14 +176,13 @@ public void createScriptShouldFail() { String query = "SELECT series_id, title, release_date " - + "FROM series WHERE series_id = 1"; + + "FROM series2 WHERE series_id = 1"; // Executes data query with specified transaction control settings. Result result = retryCtx.supplyResult( session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) ).join(); - // Check that table exists and contains no data Assert.assertFalse(result.isSuccess()); } @@ -191,31 +195,32 @@ public void createScriptShouldFail() { */ @Test public void createInsertQueryScript() { - runCreateSuccessScript(); - ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() .withExecMode(QueryExecMode.EXECUTE) .withTtl(Duration.ofSeconds(10)) .build(); - Status status = scriptClient.startJoinScript("" - + "DECLARE $values AS List>;" - + "DECLARE $values1 AS List>;" - + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" - + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", - Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings); + Status status = scriptClient.startQueryScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings) + .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1)) + .join(); + Assert.assertNotNull(status); Assert.assertTrue(status.isSuccess()); String query @@ -240,13 +245,11 @@ public void createInsertQueryScript() { */ @Test public void findAndStartScript() { - runCreateSuccessScript(); - ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() .withExecMode(QueryExecMode.EXECUTE) .build(); - CompletableFuture future = scriptClient.startScript("" + Operation operation = scriptClient.startQueryScript("" + "DECLARE $values AS List operation1 = scriptClient.findQueryScript(operation.getId(), FindScriptSettings.newBuilder().build()).join(); - OperationScript operationScript1 = scriptClient.findScript(operationScript.getId(), FindScriptSettings.newBuilder().build()).join(); + Assert.assertEquals(operation.getId(), operation1.getId()); - Assert.assertEquals(operationScript1.getId(), operationScript.getId()); - - Status status = operationScript1.waitForResult(); + Status status = scriptClient.fetchQueryScriptStatus(operation1, 1).join(); Assert.assertTrue(status.isSuccess()); } @@ -286,19 +287,14 @@ public void findAndStartScript() { *
  • Insert sample data via parameterized script
  • *
  • Fetch the result set from the executed operation
  • * - * - * @throws ExecutionException if the script future fails - * @throws InterruptedException if the fetch operation is interrupted */ @Test - public void fetchScript() throws ExecutionException, InterruptedException { - runCreateSuccessScript(); - + public void fetchScript() { ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() .withExecMode(QueryExecMode.EXECUTE) .build(); - CompletableFuture future = scriptClient.startScript("" + Operation operation = scriptClient.startQueryScript("" + "DECLARE $values AS List resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings1) + .join(); + + checkFetch(resultPartResult, 1); + + FetchScriptSettings fetchScriptSettings2 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(resultPartResult.getValue().getResultSetIndex()) + .build(); + + Result resultPartResult1 = scriptClient.fetchQueryScriptResult(operation, resultPartResult.getValue(), fetchScriptSettings2) + .join(); + + checkFetch(resultPartResult1, 2); + } + + @Test + public void fetchScriptWithManyResultSet() { + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .build(); + + Operation operation = scriptClient.startQueryScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_id = 1 order by series_id;" + + "SELECT season_id FROM seasons where series_id = 2 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings).join(); + + scriptClient.fetchQueryScriptStatus(operation, 1).join(); + + FetchScriptSettings fetchScriptSettings1 = FetchScriptSettings.newBuilder() + .withRowsLimit(10) + .withSetResultSetIndex(0) + .build(); + + Result resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings1) + .join(); + + ScriptResultPart part = resultPartResult.getValue(); + + ResultSetReader reader = part.getResultSetReader(); + + Assert.assertEquals(4, reader.getRowCount()); FetchScriptSettings fetchScriptSettings2 = FetchScriptSettings.newBuilder() + .withRowsLimit(10) + .withSetResultSetIndex(1) + .build(); + + Result resultPartResult1 = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings2) + .join(); + + ScriptResultPart part1 = resultPartResult1.getValue(); + + ResultSetReader reader2 = part1.getResultSetReader(); + + Assert.assertEquals(5, reader2.getRowCount()); + } + + @Test + public void fetchScriptWithError() { + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .build(); + + Operation operation = scriptClient.startQueryScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_ids = 1 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings).join(); + + Status status = scriptClient.fetchQueryScriptStatus(operation, 1).join(); + + FetchScriptSettings fetchScriptSettings1 = FetchScriptSettings.newBuilder() .withRowsLimit(1) .withSetResultSetIndex(0) - .withEOperationId(operationScript.getId()) - .withFetchToken(fetchScriptResult.getNextFetchToken()) .build(); - checkFetch(fetchScriptSettings2, 2); + Result resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings1) + .join(); + + Assert.assertTrue(resultPartResult.getValue().hasErrors()); + + Assert.assertTrue( + Arrays.stream(resultPartResult.getValue().getIssues()).anyMatch( + issue -> issue.toString().contains("not found: series_ids."))); } - private FetchScriptResult checkFetch(FetchScriptSettings fetchScriptSettings, int value) { - FetchScriptResult fetchScriptResult = scriptClient.fetchScriptResults(fetchScriptSettings).join(); + private void checkFetch(Result resultPartResult, int value) { + ScriptResultPart scriptResultPart = resultPartResult.getValue(); - ValueProtos.ResultSet resultSet = fetchScriptResult.getResultSet(); - Assert.assertEquals(1, resultSet.getRowsCount()); + ResultSetReader reader = scriptResultPart.getResultSetReader(); + Assert.assertEquals(1, reader.getRowCount()); - ResultSetReader reader = ProtoValueReaders.forResultSet(resultSet); reader.next(); Assert.assertEquals(value, reader.getColumn(0).getUint64()); - return fetchScriptResult; } - private Status runCreateSuccessScript() { - return runCreateScript("" - + "CREATE TABLE series (" - + " series_id UInt64," - + " title Text," - + " series_info Text," - + " release_date Date," - + " PRIMARY KEY(series_id)" - + ");" - + "" - + "CREATE TABLE seasons (" - + " series_id UInt64," - + " season_id UInt64," - + " title Text," - + " first_aired Date," - + " last_aired Date," - + " PRIMARY KEY(series_id, season_id)" - + ")"); - } - - private Status runCreateScript(String query) { - return scriptClient.startJoinScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); + private static Status runCreateScript(String query) { + return scriptClient.startQueryScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()) + .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1)) + .join(); } } From 6df40f09c56653413a671cf9ee0e1b7f1eb05227 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Tue, 18 Nov 2025 16:47:37 +0300 Subject: [PATCH 3/3] change javadoc --- .../tech/ydb/query/script/ScriptClient.java | 51 ++++----- .../java/tech/ydb/query/script/ScriptRpc.java | 4 +- .../query/script/impl/ScriptClientImpl.java | 27 +++-- .../ydb/query/script/impl/ScriptRpcImpl.java | 11 +- .../query/script/result/ScriptResultPart.java | 11 ++ .../settings/ExecuteScriptSettings.java | 5 +- .../script/settings/FetchScriptSettings.java | 4 + .../script/settings/FindScriptSettings.java | 7 ++ .../ydb/query/impl/ScriptExampleTest.java | 107 +++++++++++------- 9 files changed, 142 insertions(+), 85 deletions(-) diff --git a/query/src/main/java/tech/ydb/query/script/ScriptClient.java b/query/src/main/java/tech/ydb/query/script/ScriptClient.java index a509fb43c..19aaaa85d 100644 --- a/query/src/main/java/tech/ydb/query/script/ScriptClient.java +++ b/query/src/main/java/tech/ydb/query/script/ScriptClient.java @@ -1,5 +1,10 @@ package tech.ydb.query.script; +import java.util.concurrent.CompletableFuture; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.operation.Operation; @@ -10,11 +15,6 @@ import tech.ydb.query.script.settings.FindScriptSettings; import tech.ydb.table.query.Params; - -import javax.annotation.Nullable; - -import java.util.concurrent.CompletableFuture; - /** * High-level API for executing YQL scripts and retrieving their results. *

    @@ -27,27 +27,21 @@ *

  • fetchQueryScriptStatus - wait for script execution
  • *
  • fetchQueryScriptResult - fetch script result if necessary
  • * - *

    - * Example with fetch - *

    - *

      - *
    • Operation operation = scriptClient.startQueryScript("select...",Params.of(...), executeScriptSettings).join()
    • - *
    • Status status = scriptClient.fetchQueryScriptStatus(operation, 1).join()
    • - *
    • Result resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings).join()
    • - *
    • ResultSetReader reader = scriptResultPart.getResultSetReader()
    • - *
    • reader.next()
    • - *
    - *

    - * - * * Example without fetch - * *

    - * *

      - * *
    • Status status = - * scriptClient.startQueryScript("select...",Params.of(...), executeScriptSettings) - * .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1)) - * .join()
    • - * *
    - * *

    + *

    Example with fetch + *

    {@code
    + *      Operation operation = scriptClient.startQueryScript("select...",Params.of(...), executeScriptSettings).join())
    + *      Status status = scriptClient.fetchQueryScriptStatus(operation, 1).join()
    + *      Result< ScriptResultPart> resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings).join()
    + *      ResultSetReader reader = scriptResultPart.getResultSetReader()
    + *      reader.next()
    + * }
    + *

    Example without fetch + *

    {@code
    + * Status status = scriptClient.startQueryScript("select...",Params.of(...), executeScriptSettings)
    + *                             .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1))
    + *                             .join()
    + * }
    + *

    Author: Evgeny Kuvardin */ public interface ScriptClient { @@ -91,7 +85,8 @@ default CompletableFuture fetchQueryScriptStatus(Operation opera * @param settings fetch configuration * @return future resolving to result part containing a result set fragment */ - CompletableFuture> fetchQueryScriptResult(Operation operation, - @Nullable ScriptResultPart previous, FetchScriptSettings settings); + CompletableFuture> fetchQueryScriptResult(@Nonnull Operation operation, + @Nullable ScriptResultPart previous, + FetchScriptSettings settings); } diff --git a/query/src/main/java/tech/ydb/query/script/ScriptRpc.java b/query/src/main/java/tech/ydb/query/script/ScriptRpc.java index 72723325d..3773c5c9c 100644 --- a/query/src/main/java/tech/ydb/query/script/ScriptRpc.java +++ b/query/src/main/java/tech/ydb/query/script/ScriptRpc.java @@ -1,13 +1,13 @@ package tech.ydb.query.script; +import java.util.concurrent.CompletableFuture; + import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.operation.Operation; import tech.ydb.proto.query.YdbQuery; -import java.util.concurrent.CompletableFuture; - /** * Low-level RPC interface for executing YQL scripts and fetching their results using gRPC. *

    diff --git a/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java b/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java index e12054bad..cea06a66e 100644 --- a/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java +++ b/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java @@ -1,5 +1,12 @@ package tech.ydb.query.script.impl; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.WillNotClose; + import com.google.protobuf.Duration; import tech.ydb.core.Result; @@ -13,19 +20,20 @@ import tech.ydb.query.script.ScriptRpc; import tech.ydb.query.script.result.ScriptResultPart; import tech.ydb.query.script.settings.ExecuteScriptSettings; -import tech.ydb.query.script.settings.FindScriptSettings; import tech.ydb.query.script.settings.FetchScriptSettings; +import tech.ydb.query.script.settings.FindScriptSettings; import tech.ydb.query.settings.QueryExecMode; import tech.ydb.query.settings.QueryStatsMode; import tech.ydb.table.query.Params; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.WillNotClose; - -import java.util.UUID; -import java.util.concurrent.CompletableFuture; - +/** + * Default implementation of {@link ScriptClient} using {@link ScriptRpc} for RPC calls. + *

    + * Handles script execution lifecycle: starting scripts, polling their status, + * and retrieving result sets in streaming fashion. + * + *

    Author: Evgeny Kuvardin + */ public class ScriptClientImpl implements ScriptClient { private final ScriptRpc scriptRpc; @@ -74,7 +82,8 @@ public CompletableFuture> startQueryScript(String query, @Override public CompletableFuture> fetchQueryScriptResult(@Nonnull Operation operation, - @Nullable ScriptResultPart previous, FetchScriptSettings settings) { + @Nullable ScriptResultPart previous, + FetchScriptSettings settings) { YdbQuery.FetchScriptResultsRequest.Builder requestBuilder = YdbQuery.FetchScriptResultsRequest.newBuilder(); if (previous != null && previous.getNextFetchToken() != null) { diff --git a/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java b/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java index a1a0bb50b..e7028d4e2 100644 --- a/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java +++ b/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java @@ -1,5 +1,9 @@ package tech.ydb.query.script.impl; +import java.util.concurrent.CompletableFuture; + +import javax.annotation.WillNotClose; + import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcRequestSettings; @@ -12,17 +16,14 @@ import tech.ydb.proto.query.v1.QueryServiceGrpc; import tech.ydb.query.script.ScriptRpc; -import javax.annotation.WillNotClose; - -import java.util.concurrent.CompletableFuture; - /** * Default gRPC-based implementation of {@link ScriptRpc}. *

    * Uses {@link GrpcTransport} to communicate with YDB QueryService and OperationService. * Provides async unary calls for executing scripts and retrieving results or operation metadata. + * + *

    Author: Evgeny Kuvardin */ - public class ScriptRpcImpl implements ScriptRpc { private final GrpcTransport transport; diff --git a/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java b/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java index adfd5cbce..87adb44fd 100644 --- a/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java +++ b/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java @@ -5,6 +5,17 @@ import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.result.impl.ProtoValueReaders; +/** + * Represents a single portion of script execution results. + *

    + * Contains: + *

      + *
    • a result set reader for the retrieved rows
    • + *
    • index of the result set within the script
    • + *
    • fetch token for retrieving the next portion
    • + *
    • issues returned by the server
    • + *
    + */ public class ScriptResultPart { private final ResultSetReader resultSetReader; private final long resultSetIndex; diff --git a/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java index 77c3b11c6..a328b0e65 100644 --- a/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java +++ b/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java @@ -7,9 +7,10 @@ import tech.ydb.query.settings.QueryStatsMode; /** - * Settings for configuring script execution requests. + * Settings controlling execution of a YQL script. *

    - * Used by {@code QuerySession.executeScript(...)} and similar APIs. + * Used to specify execution mode, statistics collection level, + * result TTL, and resource pool assignment. * *

    Author: Evgeny Kuvardin */ diff --git a/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java index 60c204680..0b530b2ad 100644 --- a/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java +++ b/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java @@ -4,6 +4,7 @@ /** * Settings for configuring the fetch phase of a previously executed YQL script. + * Take a note that script should be executed successfully before fetch result *

    * These settings define which operation results to fetch, pagination options, * row limits, and which result set index to retrieve. @@ -44,6 +45,9 @@ public long getSetResultSetIndex() { return setResultSetIndex; } + /** + * Creates a new builder configured for asynchronous operation requests. + */ public static Builder newBuilder() { return new Builder(); } diff --git a/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java index a600211ae..0dc78404e 100644 --- a/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java +++ b/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java @@ -2,6 +2,13 @@ import tech.ydb.core.settings.OperationSettings; +/** + * Settings for retrieving metadata of a previously started script operation. + *

    + * Extends {@link OperationSettings} and enables async fetching by default. + * + *

    Author: Evgeny Kuvardin + */ public class FindScriptSettings extends OperationSettings { private FindScriptSettings(Builder builder) { diff --git a/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java b/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java index d92890dac..862354478 100644 --- a/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java +++ b/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java @@ -2,8 +2,6 @@ import java.time.Duration; import java.util.Arrays; -import java.util.concurrent.ExecutionException; -import java.util.function.Predicate; import java.util.stream.Collectors; import org.junit.After; @@ -42,14 +40,6 @@ * Integration tests that validate the execution of YQL scripts * using the YDB Query API and scripting features. * - *

    Tests cover: - *

      - *
    • Script execution with and without parameters
    • - *
    • Error handling in scripts
    • - *
    • Sequential script execution
    • - *
    • Fetching results from executed scripts
    • - *
    - * *

    Author: Evgeny Kuvardin */ public class ScriptExampleTest { @@ -149,28 +139,36 @@ public static void cleanAll() { } /** - * Ensures that script execution fails when it contains syntax errors. + * Validates that executing a malformed YQL script results in a failure. *

    - * Attempts to execute a malformed YQL script and verifies that the result - * indicates failure. + * Test steps: + *

      + *
    1. Attempts to execute an invalid script containing a syntax error
    2. + *
    3. Verifies that the script operation returns a failed {@link Status}
    4. + *
    5. Performs a follow-up SELECT query on a table that should not exist
    6. + *
    7. Confirms that the query fails, as expected
    8. + *
    + * The test ensures that invalid scripts do not produce side effects. */ @Test public void createScriptShouldFail() { - Status statusOperation = runCreateScript("CREATE TABLE series2 (" - + " series_id UInt64," - + " title Text," - + " series_info Text," - + " release_date Date," - + " PRIMARY KEY(series_id)" - + ");" - + "ZCREATE TABLE seasons2 (" - + " series_id UInt64," - + " season_id UInt64," - + " title Text," - + " first_aired Date," - + " last_aired Date," - + " PRIMARY KEY(series_id, season_id)" - + ")"); + Status statusOperation = scriptClient.startQueryScript("" + + "CREATE TABLE series2 " + + "( series_id UInt64, " + + " title Text, " + + " series_info Text, " + + " release_date Date, " + + " PRIMARY KEY(series_id));" + + "ZCREATE TABLE seasons2 (" + + " series_id UInt64, " + + " season_id UInt64, " + + " title Text, " + + " first_aired Date, " + + " last_aired Date, " + + " PRIMARY KEY(series_id, season_id))", + Params.empty(), ExecuteScriptSettings.newBuilder().build()) + .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1)) + .join(); Assert.assertFalse(statusOperation.isSuccess()); @@ -188,10 +186,15 @@ public void createScriptShouldFail() { } /** - * Validates sequential script execution using QueryClient.executeScript. + * Verifies end-to-end execution of a script that performs UPSERT operations. *

    - * Creates tables, then inserts data in a separate script execution, and - * verifies data persistence. + * Test steps: + *

      + *
    1. Configures an execution script
    2. + *
    3. Runs a script that inserts sample data into multiple tables
    4. + *
    5. Waits for script completion
    6. + *
    7. Executes a SELECT query to confirm that data was inserted
    8. + *
    */ @Test public void createInsertQueryScript() { @@ -238,10 +241,19 @@ public void createInsertQueryScript() { Assert.assertEquals(1, rs.getColumn("series_id").getUint64()); } + /** - * Validate that find script is working + * Validates the ability to find script *

    - * In this test we start script then try it to find and wait for finish execution + * Test steps: + *

      + *
    1. Starts a script that performs UPSERTs and a SELECT query
    2. + *
    3. Fetches the operation using {@code findQueryScript}
    4. + *
    5. Ensures the returned operation ID matches the original
    6. + *
    7. Waits for operation completion and validates success
    8. + *
    + * Confirms that {@link ScriptClient#findQueryScript} correctly locates + * running or completed script operations. */ @Test public void findAndStartScript() { @@ -336,6 +348,16 @@ public void fetchScript() { checkFetch(resultPartResult1, 2); } + /** + * Verifies fetching multiple result sets generated by a single script. + *

    + * Scenario: + *

      + *
    1. Executes a script that produces two independent SELECT result sets
    2. + *
    3. Fetches each result set separately using {@code resultSetIndex}
    4. + *
    5. Validates row counts for each result set
    6. + *
    + */ @Test public void fetchScriptWithManyResultSet() { ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() @@ -393,6 +415,19 @@ public void fetchScriptWithManyResultSet() { Assert.assertEquals(5, reader2.getRowCount()); } + /** + * Ensures that script execution surfaces query errors in result fetch. + *

    + * Test steps: + *

      + *
    1. Executes a script containing an incorrect column reference
    2. + *
    3. Waits for script execution to complete
    4. + *
    5. Fetches the corresponding result set
    6. + *
    7. Validates that the result contains error issues
    8. + *
    9. Checks that the reported issue matches the incorrect column name
    10. + *
    + */ + @Test public void fetchScriptWithError() { ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() @@ -444,10 +479,4 @@ private void checkFetch(Result resultPartResult, int value) { reader.next(); Assert.assertEquals(value, reader.getColumn(0).getUint64()); } - - private static Status runCreateScript(String query) { - return scriptClient.startQueryScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()) - .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1)) - .join(); - } }