Skip to content

Commit

Permalink
Add ComputeGroupMgr
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Feb 25, 2025
1 parent 69ff4f3 commit bf00c27
Show file tree
Hide file tree
Showing 24 changed files with 420 additions and 79 deletions.
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.AdmissionControl;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroupMgr;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
Expand Down Expand Up @@ -532,6 +533,8 @@ public class Env {

private WorkloadGroupMgr workloadGroupMgr;

private ComputeGroupMgr computeGroupMgr;

private WorkloadSchedPolicyMgr workloadSchedPolicyMgr;

private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
Expand Down Expand Up @@ -810,6 +813,7 @@ public Env(boolean isCheckpointCatalog) {
this.statisticsJobAppender = new StatisticsJobAppender();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.computeGroupMgr = new ComputeGroupMgr(systemInfo);
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
this.admissionControl = new AdmissionControl(systemInfo);
Expand Down Expand Up @@ -919,6 +923,10 @@ public AuditEventProcessor getAuditEventProcessor() {
return auditEventProcessor;
}

public ComputeGroupMgr getComputeGroupMgr() {
return computeGroupMgr;
}

public WorkloadGroupMgr getWorkloadGroupMgr() {
return workloadGroupMgr;
}
Expand Down
15 changes: 8 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
Expand Down Expand Up @@ -114,7 +115,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -3096,19 +3096,20 @@ public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) {
fetchOption.setFetchRowStore(useStoreRow);
fetchOption.setUseTwoPhaseFetch(true);

// get backend by tag
Set<Tag> tagSet = new HashSet<>();
ConnectContext context = ConnectContext.get();
if (context != null) {
tagSet = context.getResourceTags();
if (context == null) {
context = new ConnectContext();
context.setThreadLocalInfo();
}
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
.needQueryAvailable()
.setRequireAliveBe()
.addTags(tagSet)
.build();

TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
for (Backend backend : Env.getCurrentSystemInfo().getBackendsByPolicy(policy)) {
ComputeGroup computeGroup = context.getComputeGroupSafely();

for (Backend backend : policy.getCandidateBackends(computeGroup.getBackendList())) {
nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package org.apache.doris.datasource;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.IndexedPriorityQueue;
import org.apache.doris.common.ResettableRandomizedIterator;
Expand Down Expand Up @@ -155,19 +156,24 @@ public void init() throws UserException {

public void init(List<String> preLocations) throws UserException {
Set<Tag> tags = Sets.newHashSet();
if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
// Some request from stream load(eg, mysql load) may not set user info in ConnectContext
// just ignore it.
if (!Strings.isNullOrEmpty(qualifiedUser)) {
tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
throw new UserException("No valid resource tag for user: " + qualifiedUser);
// in cloud mode, resource tag may be a subgroup under compute group,
// so the logic for filtering BE by BeSelectionPolicy should be retained.
// but in local mode, BE is filtered by ComputeGroup.
if (Config.isCloudMode()) {
if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
// Some request from stream load(eg, mysql load) may not set user info in ConnectContext
// just ignore it.
if (!Strings.isNullOrEmpty(qualifiedUser)) {
tags = Env.getCurrentEnv().getAuth().getComputeGroupTags(qualifiedUser);
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
throw new UserException("No valid resource tag for user: " + qualifiedUser);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer");
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer");
}
}

Expand All @@ -184,11 +190,23 @@ public void init(List<String> preLocations) throws UserException {
}

public void init(BeSelectionPolicy policy) throws UserException {
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo()
.getBackendsByCurrentCluster().values().asList()));
ConnectContext ctx = ConnectContext.get();
List<Backend> backendList = null;
if (ctx == null) {
if (Config.isCloudMode()) {
throw new AnalysisException("ConnectContext is null");
} else {
ctx = new ConnectContext();
ctx.setThreadLocalInfo();
}

}
backendList = ctx.getComputeGroup().getBackendList();

backends.addAll(policy.getCandidateBackends(backendList));
if (backends.isEmpty()) {
throw new UserException("No available backends, "
+ "in cloud maybe this cluster has been dropped, please `use @otherClusterName` switch it");
+ "in cloud maybe this cluster has been dropped, please `use @otherClusterName` switch it");
}
for (Backend backend : backends) {
assignedWeightPerBackend.put(backend, 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
Expand Down Expand Up @@ -439,18 +440,24 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
throws LoadException {
Backend backend = null;
BeSelectionPolicy policy = null;
String qualifiedUser = ConnectContext.get().getQualifiedUser();
Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
policy = new BeSelectionPolicy.Builder()
.addTags(userTags)
.setEnableRoundRobin(true)
.needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
List<Long> backendIds;
if (groupCommit) {
String qualifiedUser = ConnectContext.get().getQualifiedUser();
Set<Tag> userTags = Env.getCurrentEnv().getAuth().getComputeGroupTags(qualifiedUser);
policy = new BeSelectionPolicy.Builder()
.addTags(userTags)
.setEnableRoundRobin(true)
.needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1);
} else {
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
ComputeGroup computeGroup = ConnectContext.get().getComputeGroupSafely();
policy = new BeSelectionPolicy.Builder()
.setEnableRoundRobin(true)
.needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
backendIds = Env.getCurrentSystemInfo()
.selectBackendIdsByPolicy(policy, 1, computeGroup.getBackendList());
}
if (backendIds.isEmpty()) {
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -231,6 +232,14 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),

UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
ConnectContext context = ConnectContext.get();
if (context == null) {
context = new ConnectContext();
context.setThreadLocalInfo();
}
if (StringUtils.isEmpty(context.getQualifiedUser())) {
context.setQualifiedUser(getUserInfo().getQualifiedUser());
}
task.init(loadId, attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey), getUserInfo());
task.settWorkloadGroups(tWorkloadGroups);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;

import com.aliyuncs.utils.StringUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand All @@ -83,6 +82,7 @@
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -993,6 +993,14 @@ public TPipelineFragmentParams plan(StreamLoadPlanner planner, TUniqueId loadId,
} else {
ConnectContext.get().setCloudCluster(clusterName);
}
} else {
if (ConnectContext.get() == null) {
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
ctx.setQualifiedUser(this.getQualifiedUser());
} else if (StringUtils.isEmpty(ConnectContext.get().getQualifiedUser())) {
ConnectContext.get().setQualifiedUser(getQualifiedUser());
}
}

TPipelineFragmentParams planParams = planner.plan(loadId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,12 @@ public long getAvailableBeForTask(long jobId, long previousBeId) throws LoadExce
* @throws LoadException
*/
protected List<Long> getAvailableBackendIds(long jobId) throws LoadException {
// Usually Cloud node could not reach here(refer CloudRoutineLoadManager.getAvailableBackendIds),
// check cloud mode here is just to be on the safe side.
if (Config.isCloudMode()) {
throw new LoadException("cloud mode should not reach here");
}

RoutineLoadJob job = getJob(jobId);
if (job == null) {
throw new LoadException("job " + jobId + " does not exist");
Expand All @@ -564,13 +570,14 @@ protected List<Long> getAvailableBackendIds(long jobId) throws LoadException {
// For old job, there may be no user info. So we have to use tags from replica allocation
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
} else {
tags = Env.getCurrentEnv().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
tags = Env.getCurrentEnv().getAuth().getComputeGroupTags(job.getUserIdentity().getQualifiedUser());
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
// user may be dropped, or may not set resource tag property.
// Here we fall back to use replica tag
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
}
}

BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().addTags(tags).build();
return Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1 /* as many as possible */);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public static boolean negotiate(ConnectContext context) throws IOException {
}

// set resource tag if has
context.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser));
context.setComputeGroupTags(Env.getCurrentEnv().getAuth().getComputeGroupTags(qualifiedUser));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,10 +1229,10 @@ public int getCpuResourceLimit(String qualifiedUser) {
}
}

public Set<Tag> getResourceTags(String qualifiedUser) {
public Set<Tag> getComputeGroupTags(String qualifiedUser) {
readLock();
try {
return propertyMgr.getResourceTags(qualifiedUser);
return propertyMgr.getComputeGroupTags(qualifiedUser);
} finally {
readUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
}

newDefaultLoadCluster = value;
} else if (keyArr[0].equalsIgnoreCase(DEFAULT_CLOUD_CLUSTER)) {
} else if (keyArr[0].equalsIgnoreCase(DEFAULT_CLOUD_CLUSTER)) {
newDefaultCloudCluster = checkCloudDefaultCluster(keyArr, value, DEFAULT_CLOUD_CLUSTER, isReplay);
} else if (keyArr[0].equalsIgnoreCase(DEFAULT_COMPUTE_GROUP)) {
newDefaultCloudCluster = checkCloudDefaultCluster(keyArr, value, DEFAULT_COMPUTE_GROUP, isReplay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public int getParallelFragmentExecInstanceNum(String qualifiedUser) {
return existProperty.getParallelFragmentExecInstanceNum();
}

public Set<Tag> getResourceTags(String qualifiedUser) {
public Set<Tag> getComputeGroupTags(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getPropertyIfNull(qualifiedUser, existProperty);
if (existProperty == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.apache.doris.planner.normalize.Normalizer;
import org.apache.doris.planner.normalize.PartitionRangePredicateNormalizer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsDeriveResult;
import org.apache.doris.statistics.StatsRecursiveDerive;
Expand Down Expand Up @@ -102,6 +102,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -757,14 +758,12 @@ private void addScanRangeLocations(Partition partition,
}
String visibleVersionStr = String.valueOf(visibleVersion);

Set<Tag> allowedTags = Sets.newHashSet();
int useFixReplica = -1;
boolean needCheckTags = false;
boolean skipMissingVersion = false;
ConnectContext context = ConnectContext.get();
ComputeGroup computeGroup = null;
if (context != null) {
allowedTags = context.getResourceTags();
needCheckTags = context.isResourceTagsSet();
computeGroup = context.getComputeGroupSafely();
useFixReplica = context.getSessionVariable().useFixReplica;
if (useFixReplica == -1
&& context.getState().isNereids() && context.getSessionVariable().getEnableQueryCache()) {
Expand Down Expand Up @@ -914,10 +913,12 @@ private void addScanRangeLocations(Partition partition,
if (!backend.isMixNode()) {
continue;
}
if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getLocationTag())) {
String beCgName = backend.getComputeGroupName();
if (computeGroup != null && !Config.isCloudMode() && !computeGroup.getComputeGroupNameSet()
.contains(beCgName)) {
String err = String.format(
"Replica on backend %d with tag %s," + " which is not in user's resource tags: %s",
backend.getId(), backend.getLocationTag(), allowedTags);
"Replica on backend %d with tag %s," + " which is not in user's compute groups: %s",
backend.getId(), beCgName, StringUtils.join(computeGroup.getComputeGroupNameSet(), ","));
if (LOG.isDebugEnabled()) {
LOG.debug(err);
}
Expand Down
Loading

0 comments on commit bf00c27

Please sign in to comment.