Skip to content

[Persistent Tasks] Assign based on ProjectId #130391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
Expand Down Expand Up @@ -133,10 +135,11 @@ protected HealthNode createTask(
* Returns the node id from the eligible health nodes
*/
@Override
public PersistentTasksCustomMetadata.Assignment getAssignment(
protected PersistentTasksCustomMetadata.Assignment doGetAssignment(
HealthNodeTaskParams params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState
ClusterState clusterState,
@Nullable ProjectId projectId
Comment on lines -139 to +142
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably want to split this method up into a cluster-scoped version and a project-scoped version. The health node persistent task is cluster-scoped, so it doesn't really make sense to have a project ID here or in other cluster-scoped persistent tasks (even though it's nullable). Let me know if "cluster-scoped vs. project-scoped persistent tasks" sound unfamiliar to you, then I (or Yang) can explain what they are. But I'll let @ywangd decide whether he agrees or whether he's fine with the nullable project ID like this.

If we decide to split it up, we can one method without a project ID (for cluster-scoped tasks) and one with a ProjectState (instead of a ClusterState and ProjectId). We created ProjectState to avoid passing cluster states together with project IDs.

Copy link
Member Author

@prwhelan prwhelan Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to rework this, I'm not thrilled about passing nulls around. I'd be happy to instead add a method to the parent, something like:

public Assignment getAssignment(Params params, Collection<DiscoveryNode> candidateNodes, ProjectState projectState) {

and then PersistentTasksClusterService can call the ProjectState or ClusterState API depending on the scope of the persistent task? I'm not sure if that is cleaner for the persistent task framework at the base level but it feels cleaner for the implementations.

99% of this PR was written by IntelliJ's refactor button so we'd only be throwing away minutes of work =)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, something like that is what I had in mind as well (perhaps with a different name for clarity, i.e. getProjectScopedAssignment and getClusterScopedAssignment, but that wouldn't be a blocker for me). Curious to hear what Yang thinks of all this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can go with two separate methods. The PersistentTasksExecutor#scope method can be used to tell the scope of the executor and subsequently call the relevant getXxxAssignment method.

Theoretically we can have a single overriden generic method for project and cluster scoped task executors, if ProjectState and ClusterState shares some interface, e.g. Supplier<ClusterState>. It should help reducing verbosity of the types. We will still need to check the task executor types and pass either ClusterState or ProjectState to the method accordingly. This might be something worth doing in future since it feels like a better type system. But it is definitely outside of this PR.

) {
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
if (discoveryNode == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ public ClusterState execute(ClusterState currentState) {
assert (projectId == null && taskExecutor.scope() == PersistentTasksExecutor.Scope.CLUSTER)
|| (projectId != null && taskExecutor.scope() == PersistentTasksExecutor.Scope.PROJECT)
: "inconsistent project-id [" + projectId + "] and task scope [" + taskExecutor.scope() + "]";
taskExecutor.validate(taskParams, currentState);
taskExecutor.validate(taskParams, currentState, projectId);

Assignment assignment = createAssignment(taskName, taskParams, currentState);
Assignment assignment = createAssignment(taskName, taskParams, currentState, projectId);
logger.debug("creating {} persistent task [{}] with assignment [{}]", taskTypeString(projectId), taskName, assignment);
return builder.addTask(taskId, taskName, taskParams, assignment).buildAndUpdate(currentState, projectId);
}
Expand Down Expand Up @@ -449,7 +449,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
private <Params extends PersistentTaskParams> Assignment createAssignment(
final String taskName,
final Params taskParams,
final ClusterState currentState
final ClusterState currentState,
@Nullable final ProjectId projectId
) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);

Expand All @@ -468,7 +469,7 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
// Task assignment should not rely on node order
Randomness.shuffle(candidateNodes);

final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState);
final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState, projectId);
assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not";
assert (assignment.getExecutorNode() == null
|| currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false)
Expand Down Expand Up @@ -540,8 +541,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
* persistent tasks changed.
*/
boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
final List<PersistentTasks> allTasks = PersistentTasks.getAllTasks(event.state()).map(Tuple::v2).toList();
if (allTasks.isEmpty()) {
var projectIdToTasksIterator = PersistentTasks.getAllTasks(event.state()).iterator();
if (projectIdToTasksIterator.hasNext() == false) {
return false;
}

Expand All @@ -553,10 +554,16 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
|| event.metadataChanged()
|| masterChanged) {

for (PersistentTasks tasks : allTasks) {
for (PersistentTask<?> task : tasks.tasks()) {
while (projectIdToTasksIterator.hasNext()) {
var projectIdToTasks = projectIdToTasksIterator.next();
for (PersistentTask<?> task : projectIdToTasks.v2().tasks()) {
if (needsReassignment(task.getAssignment(), event.state().nodes())) {
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state());
Assignment assignment = createAssignment(
task.getTaskName(),
task.getParams(),
event.state(),
projectIdToTasks.v1()
);
if (Objects.equals(assignment, task.getAssignment()) == false) {
return true;
}
Expand Down Expand Up @@ -602,7 +609,7 @@ private ClusterState reassignClusterOrSingleProjectTasks(@Nullable final Project
// We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTask<?> task : tasks.tasks()) {
if (needsReassignment(task.getAssignment(), nodes)) {
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState);
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState, projectId);
if (Objects.equals(assignment, task.getAssignment()) == false) {
logger.trace(
"reassigning {} task {} from node {} to node {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.persistent;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -63,7 +64,31 @@ public Scope scope() {
* <p>
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes
*/
public Assignment getAssignment(Params params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
public final Assignment getAssignment(
Params params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState,
@Nullable ProjectId projectId
) {
assert (scope() == Scope.PROJECT && projectId != null) || (scope() == Scope.CLUSTER && projectId == null)
: "inconsistent project-id [" + projectId + "] and task scope [" + scope() + "]";
return doGetAssignment(params, candidateNodes, clusterState, projectId);
}

/**
* Returns the node id where the params has to be executed,
* <p>
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes.
* <p>
* If {@link #scope()} returns CLUSTER, then {@link ProjectId} will be null.
* If {@link #scope()} returns PROJECT, then {@link ProjectId} will not be null.
*/
protected Assignment doGetAssignment(
Params params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState,
@Nullable ProjectId projectId
) {
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
if (discoveryNode == null) {
return NO_NODE_FOUND;
Expand Down Expand Up @@ -105,7 +130,7 @@ protected DiscoveryNode selectLeastLoadedNode(
* <p>
* Throws an exception if the supplied params cannot be executed on the cluster in the current state.
*/
public void validate(Params params, ClusterState clusterState) {}
public void validate(Params params, ClusterState clusterState, @Nullable ProjectId projectId) {}

/**
* Creates a AllocatedPersistentTask for communicating with task manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -1087,7 +1088,12 @@ public Scope scope() {
}

@Override
public Assignment getAssignment(P params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
protected Assignment doGetAssignment(
P params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState,
ProjectId projectId
) {
return fn.apply(params, candidateNodes, clusterState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.client.internal.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -326,12 +327,17 @@ public static void setNonClusterStateCondition(boolean nonClusterStateCondition)
}

@Override
public Assignment getAssignment(TestParams params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
protected Assignment doGetAssignment(
TestParams params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState,
ProjectId projectId
) {
if (nonClusterStateCondition == false) {
return new Assignment(null, "non cluster state condition prevents assignment");
}
if (params == null || params.getExecutorNodeAttr() == null) {
return super.getAssignment(params, candidateNodes, clusterState);
return super.doGetAssignment(params, candidateNodes, clusterState, projectId);
} else {
DiscoveryNode executorNode = selectLeastLoadedNode(
clusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -118,7 +120,7 @@ public ShardFollowTasksExecutor(Client client, ThreadPool threadPool, ClusterSer
}

@Override
public void validate(ShardFollowTask params, ClusterState clusterState) {
public void validate(ShardFollowTask params, ClusterState clusterState, @Nullable ProjectId projectId) {
final IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
final ShardRouting primaryShard = routingTable.shard(params.getFollowShardId().id()).primaryShard();
if (primaryShard.active() == false) {
Expand All @@ -129,10 +131,11 @@ public void validate(ShardFollowTask params, ClusterState clusterState) {
private static final Assignment NO_ASSIGNMENT = new Assignment(null, "no nodes found with data and remote cluster client roles");

@Override
public Assignment getAssignment(
protected Assignment doGetAssignment(
final ShardFollowTask params,
Collection<DiscoveryNode> candidateNodes,
final ClusterState clusterState
final Collection<DiscoveryNode> candidateNodes,
final ClusterState clusterState,
@Nullable final ProjectId projectId
) {
final DiscoveryNode node = selectLeastLoadedNode(
clusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
Expand Down Expand Up @@ -93,7 +94,8 @@ private void runAssignmentTest(
final Assignment assignment = executor.getAssignment(
mock(ShardFollowTask.class),
clusterStateBuilder.nodes().getAllNodes(),
clusterStateBuilder.build()
clusterStateBuilder.build(),
ProjectId.DEFAULT
);
consumer.accept(theSpecial, assignment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -116,7 +118,7 @@ protected AllocatedPersistentTask createTask(
}

@Override
public void validate(DownsampleShardTaskParams params, ClusterState clusterState) {
public void validate(DownsampleShardTaskParams params, ClusterState clusterState, @Nullable ProjectId projectId) {
// This is just a pre-check, but doesn't prevent from avoiding from aborting the task when source index disappeared
// after initial creation of the persistent task.
var indexShardRouting = findShardRoutingTable(params.shardId(), clusterState);
Expand All @@ -126,10 +128,11 @@ public void validate(DownsampleShardTaskParams params, ClusterState clusterState
}

@Override
public PersistentTasksCustomMetadata.Assignment getAssignment(
protected PersistentTasksCustomMetadata.Assignment doGetAssignment(
final DownsampleShardTaskParams params,
final Collection<DiscoveryNode> candidateNodes,
final ClusterState clusterState
final ClusterState clusterState,
@Nullable final ProjectId projectId
) {
// NOTE: downsampling works by running a task per each shard of the source index.
// Here we make sure we assign the task to the actual node holding the shard identified by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testGetAssignment() {
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(node), clusterState);
var result = executor.getAssignment(params, Set.of(node), clusterState, projectId);
assertThat(result.getExecutorNode(), equalTo(node.getId()));
}

Expand Down Expand Up @@ -128,7 +128,7 @@ public void testGetAssignmentMissingIndex() {
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(node), clusterState);
var result = executor.getAssignment(params, Set.of(node), clusterState, projectId);
assertThat(result.getExecutorNode(), equalTo(node.getId()));
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
}
Expand Down Expand Up @@ -165,7 +165,7 @@ public void testGetStatelessAssignment() {
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState, projectId);
assertThat(result.getExecutorNode(), nullValue());

// Assign a copy of the shard to a search node
Expand All @@ -185,7 +185,7 @@ public void testGetStatelessAssignment() {
.build()
)
.build();
result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState, projectId);
assertThat(result.getExecutorNode(), equalTo(searchNode.getId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskParams;
Expand Down Expand Up @@ -86,10 +88,11 @@ protected AllocatedPersistentTask createTask(
}

@Override
public PersistentTasksCustomMetadata.Assignment getAssignment(
protected PersistentTasksCustomMetadata.Assignment doGetAssignment(
SystemIndexMigrationTaskParams params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState
ClusterState clusterState,
@Nullable ProjectId projectId
) {
// This should select from master-eligible nodes because we already require all master-eligible nodes to have all plugins installed.
// However, due to a misunderstanding, this code as-written needs to run on the master node in particular. This is not a fundamental
Expand Down
Loading