diff --git a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java index 0a7451702ec66..2eeb8c470b5d8 100644 --- a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java @@ -16,12 +16,14 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -133,10 +135,11 @@ protected HealthNode createTask( * Returns the node id from the eligible health nodes */ @Override - public PersistentTasksCustomMetadata.Assignment getAssignment( + protected PersistentTasksCustomMetadata.Assignment doGetAssignment( HealthNodeTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData); if (discoveryNode == null) { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index ceeb1a4e27f1b..f3a25caf79bb6 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -171,9 +171,9 @@ public ClusterState execute(ClusterState currentState) { assert (projectId == null && taskExecutor.scope() == PersistentTasksExecutor.Scope.CLUSTER) || (projectId != null && taskExecutor.scope() == PersistentTasksExecutor.Scope.PROJECT) : "inconsistent project-id [" + projectId + "] and task scope [" + taskExecutor.scope() + "]"; - taskExecutor.validate(taskParams, currentState); + taskExecutor.validate(taskParams, currentState, projectId); - Assignment assignment = createAssignment(taskName, taskParams, currentState); + Assignment assignment = createAssignment(taskName, taskParams, currentState, projectId); logger.debug("creating {} persistent task [{}] with assignment [{}]", taskTypeString(projectId), taskName, assignment); return builder.addTask(taskId, taskName, taskParams, assignment).buildAndUpdate(currentState, projectId); } @@ -449,7 +449,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) private Assignment createAssignment( final String taskName, final Params taskParams, - final ClusterState currentState + final ClusterState currentState, + @Nullable final ProjectId projectId ) { PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); @@ -468,7 +469,7 @@ private Assignment createAssignment( // Task assignment should not rely on node order Randomness.shuffle(candidateNodes); - final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState); + final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState, projectId); assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not"; assert (assignment.getExecutorNode() == null || currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false) @@ -540,8 +541,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) * persistent tasks changed. */ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { - final List allTasks = PersistentTasks.getAllTasks(event.state()).map(Tuple::v2).toList(); - if (allTasks.isEmpty()) { + var projectIdToTasksIterator = PersistentTasks.getAllTasks(event.state()).iterator(); + if (projectIdToTasksIterator.hasNext() == false) { return false; } @@ -553,10 +554,16 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { || event.metadataChanged() || masterChanged) { - for (PersistentTasks tasks : allTasks) { - for (PersistentTask task : tasks.tasks()) { + while (projectIdToTasksIterator.hasNext()) { + var projectIdToTasks = projectIdToTasksIterator.next(); + for (PersistentTask task : projectIdToTasks.v2().tasks()) { if (needsReassignment(task.getAssignment(), event.state().nodes())) { - Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state()); + Assignment assignment = createAssignment( + task.getTaskName(), + task.getParams(), + event.state(), + projectIdToTasks.v1() + ); if (Objects.equals(assignment, task.getAssignment()) == false) { return true; } @@ -602,7 +609,7 @@ private ClusterState reassignClusterOrSingleProjectTasks(@Nullable final Project // We need to check if removed nodes were running any of the tasks and reassign them for (PersistentTask task : tasks.tasks()) { if (needsReassignment(task.getAssignment(), nodes)) { - Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState); + Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState, projectId); if (Objects.equals(assignment, task.getAssignment()) == false) { logger.trace( "reassigning {} task {} from node {} to node {}", diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index b58ef7523bf99..96c0767fe65f8 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -10,6 +10,7 @@ package org.elasticsearch.persistent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; @@ -63,7 +64,31 @@ public Scope scope() { *

* The default implementation returns the least loaded data node from amongst the collection of candidate nodes */ - public Assignment getAssignment(Params params, Collection candidateNodes, ClusterState clusterState) { + public final Assignment getAssignment( + Params params, + Collection candidateNodes, + ClusterState clusterState, + @Nullable ProjectId projectId + ) { + assert (scope() == Scope.PROJECT && projectId != null) || (scope() == Scope.CLUSTER && projectId == null) + : "inconsistent project-id [" + projectId + "] and task scope [" + scope() + "]"; + return doGetAssignment(params, candidateNodes, clusterState, projectId); + } + + /** + * Returns the node id where the params has to be executed, + *

+ * The default implementation returns the least loaded data node from amongst the collection of candidate nodes. + *

+ * If {@link #scope()} returns CLUSTER, then {@link ProjectId} will be null. + * If {@link #scope()} returns PROJECT, then {@link ProjectId} will not be null. + */ + protected Assignment doGetAssignment( + Params params, + Collection candidateNodes, + ClusterState clusterState, + @Nullable ProjectId projectId + ) { DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData); if (discoveryNode == null) { return NO_NODE_FOUND; @@ -105,7 +130,7 @@ protected DiscoveryNode selectLeastLoadedNode( *

* Throws an exception if the supplied params cannot be executed on the cluster in the current state. */ - public void validate(Params params, ClusterState clusterState) {} + public void validate(Params params, ClusterState clusterState, @Nullable ProjectId projectId) {} /** * Creates a AllocatedPersistentTask for communicating with task manager diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index b79f2f6517189..c354f7a7d1991 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -1087,7 +1088,12 @@ public Scope scope() { } @Override - public Assignment getAssignment(P params, Collection candidateNodes, ClusterState clusterState) { + protected Assignment doGetAssignment( + P params, + Collection candidateNodes, + ClusterState clusterState, + ProjectId projectId + ) { return fn.apply(params, candidateNodes, clusterState); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index e3189de94b1a6..a6e059444e4da 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.internal.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -326,12 +327,17 @@ public static void setNonClusterStateCondition(boolean nonClusterStateCondition) } @Override - public Assignment getAssignment(TestParams params, Collection candidateNodes, ClusterState clusterState) { + protected Assignment doGetAssignment( + TestParams params, + Collection candidateNodes, + ClusterState clusterState, + ProjectId projectId + ) { if (nonClusterStateCondition == false) { return new Assignment(null, "non cluster state condition prevents assignment"); } if (params == null || params.getExecutorNodeAttr() == null) { - return super.getAssignment(params, candidateNodes, clusterState); + return super.doGetAssignment(params, candidateNodes, clusterState, projectId); } else { DiscoveryNode executorNode = selectLeastLoadedNode( clusterState, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 029ea6dcd6871..717ec4761c87e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -43,6 +44,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; @@ -118,7 +120,7 @@ public ShardFollowTasksExecutor(Client client, ThreadPool threadPool, ClusterSer } @Override - public void validate(ShardFollowTask params, ClusterState clusterState) { + public void validate(ShardFollowTask params, ClusterState clusterState, @Nullable ProjectId projectId) { final IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex()); final ShardRouting primaryShard = routingTable.shard(params.getFollowShardId().id()).primaryShard(); if (primaryShard.active() == false) { @@ -129,10 +131,11 @@ public void validate(ShardFollowTask params, ClusterState clusterState) { private static final Assignment NO_ASSIGNMENT = new Assignment(null, "no nodes found with data and remote cluster client roles"); @Override - public Assignment getAssignment( + protected Assignment doGetAssignment( final ShardFollowTask params, - Collection candidateNodes, - final ClusterState clusterState + final Collection candidateNodes, + final ClusterState clusterState, + @Nullable final ProjectId projectId ) { final DiscoveryNode node = selectLeastLoadedNode( clusterState, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java index 630aab4c78f43..7cb549df52301 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -93,7 +94,8 @@ private void runAssignmentTest( final Assignment assignment = executor.getAssignment( mock(ShardFollowTask.class), clusterStateBuilder.nodes().getAllNodes(), - clusterStateBuilder.build() + clusterStateBuilder.build(), + ProjectId.DEFAULT ); consumer.accept(theSpecial, assignment); } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index 5f91fb18fd58e..76615876c5255 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -29,6 +30,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.index.shard.ShardId; @@ -116,7 +118,7 @@ protected AllocatedPersistentTask createTask( } @Override - public void validate(DownsampleShardTaskParams params, ClusterState clusterState) { + public void validate(DownsampleShardTaskParams params, ClusterState clusterState, @Nullable ProjectId projectId) { // This is just a pre-check, but doesn't prevent from avoiding from aborting the task when source index disappeared // after initial creation of the persistent task. var indexShardRouting = findShardRoutingTable(params.shardId(), clusterState); @@ -126,10 +128,11 @@ public void validate(DownsampleShardTaskParams params, ClusterState clusterState } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment( + protected PersistentTasksCustomMetadata.Assignment doGetAssignment( final DownsampleShardTaskParams params, final Collection candidateNodes, - final ClusterState clusterState + final ClusterState clusterState, + @Nullable final ProjectId projectId ) { // NOTE: downsampling works by running a task per each shard of the source index. // Here we make sure we assign the task to the actual node holding the shard identified by diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java index 39e92f06ada16..5a4e14dc24015 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java @@ -96,7 +96,7 @@ public void testGetAssignment() { Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY ); - var result = executor.getAssignment(params, Set.of(node), clusterState); + var result = executor.getAssignment(params, Set.of(node), clusterState, projectId); assertThat(result.getExecutorNode(), equalTo(node.getId())); } @@ -128,7 +128,7 @@ public void testGetAssignmentMissingIndex() { Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY ); - var result = executor.getAssignment(params, Set.of(node), clusterState); + var result = executor.getAssignment(params, Set.of(node), clusterState, projectId); assertThat(result.getExecutorNode(), equalTo(node.getId())); assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task")); } @@ -165,7 +165,7 @@ public void testGetStatelessAssignment() { Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY ); - var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState); + var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState, projectId); assertThat(result.getExecutorNode(), nullValue()); // Assign a copy of the shard to a search node @@ -185,7 +185,7 @@ public void testGetStatelessAssignment() { .build() ) .build(); - result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState); + result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState, projectId); assertThat(result.getExecutorNode(), equalTo(searchNode.getId())); } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java index e15a1d36bdb9f..dad16d3cfa83b 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java @@ -9,10 +9,12 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskParams; @@ -86,10 +88,11 @@ protected AllocatedPersistentTask createTask( } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment( + protected PersistentTasksCustomMetadata.Assignment doGetAssignment( SystemIndexMigrationTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { // This should select from master-eligible nodes because we already require all master-eligible nodes to have all plugins installed. // However, due to a misunderstanding, this code as-written needs to run on the master node in particular. This is not a fundamental diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index a15a733cac6c7..be905caeacba0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -30,6 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.injection.guice.Inject; @@ -690,10 +692,11 @@ protected AllocatedPersistentTask createTask( } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment( + protected PersistentTasksCustomMetadata.Assignment doGetAssignment( TaskParams params, Collection candidateNodes, - @SuppressWarnings("HiddenField") ClusterState clusterState + @SuppressWarnings("HiddenField") ClusterState clusterState, + @Nullable ProjectId projectId ) { boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); Optional optionalAssignment = getPotentialAssignment( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index f45c92d3466c6..7a636e18017e1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -494,10 +495,11 @@ public StartDatafeedPersistentTasksExecutor( } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment( + protected PersistentTasksCustomMetadata.Assignment doGetAssignment( StartDatafeedAction.DatafeedParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { return new DatafeedNodeSelector( clusterState, @@ -510,7 +512,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( } @Override - public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { + public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState, @Nullable ProjectId projectId) { new DatafeedNodeSelector( clusterState, resolver, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java index 42f722e330a19..00370dde3e089 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java @@ -15,9 +15,11 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; @@ -88,10 +90,11 @@ public SnapshotUpgradeTaskExecutor( } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment( + protected PersistentTasksCustomMetadata.Assignment doGetAssignment( SnapshotUpgradeTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); Optional optionalAssignment = getPotentialAssignment( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java index 0e517b63f6f60..5621da489da7d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java @@ -17,9 +17,11 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.license.XPackLicenseState; @@ -121,7 +123,12 @@ public OpenJobPersistentTasksExecutor( } @Override - public Assignment getAssignment(OpenJobAction.JobParams params, Collection candidateNodes, ClusterState clusterState) { + protected Assignment doGetAssignment( + OpenJobAction.JobParams params, + Collection candidateNodes, + ClusterState clusterState, + @Nullable ProjectId projectId + ) { Job job = params.getJob(); // If the task parameters do not have a job field then the job // was first opened on a pre v6.6 node and has not been migrated @@ -210,13 +217,13 @@ static void validateJobAndId(String jobId, Job job) { } @Override - public void validate(OpenJobAction.JobParams params, ClusterState clusterState) { + public void validate(OpenJobAction.JobParams params, ClusterState clusterState, @Nullable ProjectId projectId) { final Job job = params.getJob(); final String jobId = params.getJobId(); validateJobAndId(jobId, job); // If we already know that we can't find an ml node because all ml nodes are running at capacity or // simply because there are no ml nodes in the cluster then we fail quickly here: - PersistentTasksCustomMetadata.Assignment assignment = getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + var assignment = getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, projectId); if (assignment.equals(AWAITING_UPGRADE)) { throw makeCurrentlyBeingUpgradedException(logger, params.getJobId()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java index 33fae40f80db6..550352954bfbc 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -62,7 +63,7 @@ public void testGetAssignment_UpgradeModeIsEnabled() { .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build())) .build(); - Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, ProjectId.DEFAULT); assertThat(assignment.getExecutorNode(), is(nullValue())); assertThat(assignment.getExplanation(), is(equalTo("persistent task cannot be assigned while upgrade mode is enabled."))); } @@ -75,7 +76,7 @@ public void testGetAssignment_NoNodes() { .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) .build(); - Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, ProjectId.DEFAULT); assertThat(assignment.getExecutorNode(), is(nullValue())); assertThat(assignment.getExplanation(), is(emptyString())); } @@ -94,7 +95,7 @@ public void testGetAssignment_NoMlNodes() { ) .build(); - Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, ProjectId.DEFAULT); assertThat(assignment.getExecutorNode(), is(nullValue())); assertThat( assignment.getExplanation(), @@ -116,7 +117,7 @@ public void testGetAssignment_MlNodeIsNewerThanTheMlJobButTheAssignmentSuceeds() .nodes(DiscoveryNodes.builder().add(createNode(0, true, Version.V_7_10_0, MlConfigVersion.V_7_10_0))) .build(); - Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, ProjectId.DEFAULT); assertThat(assignment.getExecutorNode(), is(equalTo("_node_id0"))); assertThat(assignment.getExplanation(), is(emptyString())); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index d88e1235241d8..4b1ed557ef287 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.OperationRouting; @@ -173,7 +174,7 @@ public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() { assertEquals( "Not opening [unavailable_index_with_lazy_node], " + "because not all primary shards are active for the following indices [.ml-state]", - executor.getAssignment(params, csBuilder.nodes().getAllNodes(), csBuilder.build()).getExplanation() + executor.getAssignment(params, csBuilder.nodes().getAllNodes(), csBuilder.build(), ProjectId.DEFAULT).getExplanation() ); } @@ -195,7 +196,8 @@ public void testGetAssignment_GivenLazyJobAndNoGlobalLazyNodes() { PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment( params, csBuilder.nodes().getAllNodes(), - csBuilder.build() + csBuilder.build(), + ProjectId.DEFAULT ); assertNotNull(assignment); assertNull(assignment.getExecutorNode()); @@ -216,7 +218,8 @@ public void testGetAssignment_GivenResetInProgress() { PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment( params, csBuilder.nodes().getAllNodes(), - csBuilder.build() + csBuilder.build(), + ProjectId.DEFAULT ); assertNotNull(assignment); assertNull(assignment.getExecutorNode()); diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java index 784f1c1fbe23e..10c1a4321f1e5 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -166,13 +167,14 @@ protected TaskExecutor(Client client, ClusterService clusterService, ThreadPool } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment( + protected PersistentTasksCustomMetadata.Assignment doGetAssignment( TestTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + ProjectId projectId ) { candidates.set(candidateNodes); - return super.getAssignment(params, candidateNodes, clusterState); + return super.doGetAssignment(params, candidateNodes, clusterState, projectId); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index b7bd434194b80..495a0db966343 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; @@ -113,10 +114,11 @@ public TransformPersistentTasksExecutor( } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment( + protected PersistentTasksCustomMetadata.Assignment doGetAssignment( TransformTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { /* Note: * diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index fa509143f9ba9..ec4122b3da7f2 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -12,10 +12,14 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; @@ -83,6 +87,7 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase { private static ThreadPool threadPool; private TransformConfigAutoMigration autoMigration; + private ProjectId projectId; @BeforeClass public static void setUpThreadPool() { @@ -106,13 +111,15 @@ public static void tearDownThreadPool() { } @Before - public void initMocks() { + public void setUp() throws Exception { + super.setUp(); autoMigration = mock(); doAnswer(ans -> { ActionListener listener = ans.getArgument(1); listener.onResponse(ans.getArgument(0)); return null; }).when(autoMigration).migrateAndSave(any(), any()); + projectId = randomUniqueProjectId(); } public void testNodeVersionAssignment() { @@ -124,7 +131,8 @@ public void testNodeVersionAssignment() { executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ).getExecutorNode(), equalTo("current-data-node-with-1-tasks") ); @@ -132,7 +140,8 @@ public void testNodeVersionAssignment() { executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ).getExecutorNode(), equalTo("current-data-node-with-0-tasks-transform-remote-disabled") ); @@ -140,7 +149,8 @@ public void testNodeVersionAssignment() { executor.getAssignment( new TransformTaskParams("new-old-task-id", TransformConfigVersion.V_7_7_0, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ).getExecutorNode(), equalTo("past-data-node-1") ); @@ -154,7 +164,8 @@ public void testNodeAssignmentProblems() { Assignment assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), List.of(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -173,7 +184,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), List.of(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -189,7 +201,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -205,7 +218,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNotNull(assignment.getExecutorNode()); assertThat(assignment.getExecutorNode(), equalTo("dedicated-transform-node")); @@ -218,7 +232,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_8_0_0, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -235,7 +250,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_7_5_0, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNotNull(assignment.getExecutorNode()); assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1")); @@ -248,7 +264,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_7_5_0, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -264,7 +281,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNotNull(assignment.getExecutorNode()); assertThat(assignment.getExecutorNode(), equalTo("current-data-node-with-0-tasks-transform-remote-disabled")); @@ -277,7 +295,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_7_5_0, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -299,29 +318,27 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_7_5_0, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNotNull(assignment.getExecutorNode()); assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1")); } public void testVerifyIndicesPrimaryShardsAreActive() { - Metadata.Builder metadata = Metadata.builder(); + Metadata.Builder metadata = metadataWithProject(); RoutingTable.Builder routingTable = RoutingTable.builder(); addIndices(metadata, routingTable); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - csBuilder.routingTable(routingTable.build()); + csBuilder.putRoutingTable(projectId, routingTable.build()); csBuilder.metadata(metadata); ClusterState cs = csBuilder.build(); - assertEquals( - 0, - TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs, TestIndexNameExpressionResolver.newInstance()).size() - ); + assertEquals(0, TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs, indexNameExpressionResolver()).size()); metadata = Metadata.builder(cs.metadata()); - routingTable = new RoutingTable.Builder(cs.routingTable()); + routingTable = new RoutingTable.Builder(cs.routingTable(projectId)); String indexToRemove = TransformInternalIndexConstants.LATEST_INDEX_NAME; if (randomBoolean()) { routingTable.remove(indexToRemove); @@ -342,11 +359,11 @@ public void testVerifyIndicesPrimaryShardsAreActive() { } csBuilder = ClusterState.builder(cs); - csBuilder.routingTable(routingTable.build()); + csBuilder.putRoutingTable(projectId, routingTable.build()); csBuilder.metadata(metadata); List result = TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive( csBuilder.build(), - TestIndexNameExpressionResolver.newInstance() + indexNameExpressionResolver() ); assertEquals(1, result.size()); assertEquals(indexToRemove, result.get(0)); @@ -441,7 +458,7 @@ private void addIndices(Metadata.Builder metadata, RoutingTable.Builder routingT for (String indexName : indices) { IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); indexMetadata.settings(indexSettings(IndexVersion.current(), 1, 0).put(IndexMetadata.SETTING_INDEX_UUID, "_uuid")); - metadata.put(indexMetadata); + metadata.getProject(projectId).put(indexMetadata); Index index = new Index(indexName, "_uuid"); ShardId shardId = new ShardId(index, 0); ShardRouting shardRouting = ShardRouting.newUnassigned( @@ -556,7 +573,7 @@ private DiscoveryNodes.Builder buildNodes( } private ClusterState buildClusterState(DiscoveryNodes.Builder nodes) { - Metadata.Builder metadata = Metadata.builder().clusterUUID("cluster-uuid"); + Metadata.Builder metadata = metadataWithProject().clusterUUID("cluster-uuid"); RoutingTable.Builder routingTable = RoutingTable.builder(); addIndices(metadata, routingTable); PersistentTasksCustomMetadata.Builder pTasksBuilder = PersistentTasksCustomMetadata.builder() @@ -580,15 +597,19 @@ private ClusterState buildClusterState(DiscoveryNodes.Builder nodes) { ); PersistentTasksCustomMetadata pTasks = pTasksBuilder.build(); - metadata.putCustom(PersistentTasksCustomMetadata.TYPE, pTasks); + metadata.getProject(projectId).putCustom(PersistentTasksCustomMetadata.TYPE, pTasks); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).nodes(nodes); - csBuilder.routingTable(routingTable.build()); + csBuilder.putRoutingTable(projectId, routingTable.build()); csBuilder.metadata(metadata); return csBuilder.build(); } + private Metadata.Builder metadataWithProject() { + return Metadata.builder().put(ProjectMetadata.builder(projectId)); + } + private TransformPersistentTasksExecutor buildTaskExecutor() { var transformServices = transformServices( new InMemoryTransformConfigManager(), @@ -622,11 +643,15 @@ private TransformPersistentTasksExecutor buildTaskExecutor(TransformServices tra clusterService(), Settings.EMPTY, new DefaultTransformExtension(), - TestIndexNameExpressionResolver.newInstance(), + indexNameExpressionResolver(), autoMigration ); } + private IndexNameExpressionResolver indexNameExpressionResolver() { + return TestIndexNameExpressionResolver.newInstance(TestProjectResolvers.singleProjectOnly(projectId)); + } + private ClusterService clusterService() { var clusterService = mock(ClusterService.class); var cSettings = new ClusterSettings(Settings.EMPTY, Set.of(Transform.NUM_FAILURE_RETRIES_SETTING));