Skip to content

Commit 031dac0

Browse files
committed
Introduce banned id manager and checker
1 parent 3cf82d7 commit 031dac0

File tree

8 files changed

+347
-0
lines changed

8 files changed

+347
-0
lines changed

coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java

+6
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class AccessManager {
4040
private final CoordinatorConf coordinatorConf;
4141
private final ClusterManager clusterManager;
4242
private final QuotaManager quotaManager;
43+
private final BannedManager bannedManager;
4344
private final Configuration hadoopConf;
4445
private List<AccessChecker> accessCheckers = Lists.newArrayList();
4546

@@ -53,6 +54,7 @@ public AccessManager(
5354
this.clusterManager = clusterManager;
5455
this.hadoopConf = hadoopConf;
5556
this.quotaManager = quotaManager;
57+
this.bannedManager = new BannedManager(coordinatorConf);
5658
init();
5759
}
5860

@@ -103,6 +105,10 @@ public QuotaManager getQuotaManager() {
103105
return quotaManager;
104106
}
105107

108+
public BannedManager getBannedManager() {
109+
return bannedManager;
110+
}
111+
106112
public void close() throws IOException {
107113
for (AccessChecker checker : accessCheckers) {
108114
checker.close();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.coordinator;
19+
20+
import java.util.Collections;
21+
import java.util.Map;
22+
import java.util.Set;
23+
24+
import org.apache.commons.lang3.tuple.Pair;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import org.apache.uniffle.common.util.JavaUtils;
29+
30+
/** BannedManager is a manager for ban the abnormal app. */
31+
public class BannedManager {
32+
private static final Logger LOG = LoggerFactory.getLogger(BannedManager.class);
33+
// versionId -> bannedIds
34+
private volatile Pair<String, Set<String>> bannedIdsFromRest =
35+
Pair.of("0", Collections.emptySet());
36+
private final Map<String, String> bannedIdsFromServer = JavaUtils.newConcurrentMap();
37+
38+
public BannedManager(CoordinatorConf conf) {
39+
LOG.info("BannedManager initialized successfully.");
40+
}
41+
42+
public boolean checkBanned(String id) {
43+
return bannedIdsFromRest.getValue().contains(id) || bannedIdsFromServer.containsKey(id);
44+
}
45+
46+
public void reloadBannedIdsFromRest(Pair<String, Set<String>> newBannedIds) {
47+
bannedIdsFromRest = newBannedIds;
48+
}
49+
50+
public String getBannedIdsFromRestVersion() {
51+
return bannedIdsFromRest.getKey();
52+
}
53+
}

coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java

+10
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,16 @@ public class CoordinatorConf extends RssBaseConf {
256256
.asList()
257257
.defaultValues("appHeartbeat", "heartbeat")
258258
.withDescription("Exclude record rpc audit operation list, separated by ','");
259+
public static final ConfigOption<String> COORDINATOR_ACCESS_BANNED_ID_PROVIDER =
260+
ConfigOptions.key("rss.coordinator.access.bannedIdProvider")
261+
.stringType()
262+
.noDefaultValue()
263+
.withDescription("Get the banned id from Access banned id provider ");
264+
public static final ConfigOption<String> COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN =
265+
ConfigOptions.key("rss.coordinator.access.bannedIdProviderPattern")
266+
.stringType()
267+
.defaultValue("(.*)")
268+
.withDescription("The regular banned id pattern to extract");
259269

260270
public CoordinatorConf() {}
261271

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.coordinator.access.checker;
19+
20+
import java.util.regex.Matcher;
21+
import java.util.regex.Pattern;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import org.apache.uniffle.common.util.Constants;
27+
import org.apache.uniffle.coordinator.AccessManager;
28+
import org.apache.uniffle.coordinator.CoordinatorConf;
29+
import org.apache.uniffle.coordinator.access.AccessCheckResult;
30+
import org.apache.uniffle.coordinator.access.AccessInfo;
31+
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
32+
33+
/**
34+
* AccessBannedChecker maintain a list of banned id and update it periodically, it checks the banned
35+
* id in the access request and reject if the id is in the banned list.
36+
*/
37+
public class AccessBannedChecker extends AbstractAccessChecker {
38+
private static final Logger LOG = LoggerFactory.getLogger(AccessBannedChecker.class);
39+
private final AccessManager accessManager;
40+
private final String bannedIdProviderKey;
41+
private final Pattern bannedIdProviderPattern;
42+
43+
public AccessBannedChecker(AccessManager accessManager) throws Exception {
44+
super(accessManager);
45+
this.accessManager = accessManager;
46+
CoordinatorConf conf = accessManager.getCoordinatorConf();
47+
bannedIdProviderKey = conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER);
48+
String bannedIdProviderRegex =
49+
conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN);
50+
bannedIdProviderPattern = Pattern.compile(bannedIdProviderRegex);
51+
52+
LOG.info(
53+
"Construct BannedChecker. BannedIdProviderKey is {}, pattern is {}",
54+
bannedIdProviderKey,
55+
bannedIdProviderRegex);
56+
}
57+
58+
@Override
59+
public AccessCheckResult check(AccessInfo accessInfo) {
60+
if (accessInfo.getExtraProperties() != null
61+
&& accessInfo.getExtraProperties().containsKey(bannedIdProviderKey)) {
62+
String bannedIdPropertyValue = accessInfo.getExtraProperties().get(bannedIdProviderKey);
63+
Matcher matcher = bannedIdProviderPattern.matcher(bannedIdPropertyValue);
64+
if (matcher.find()) {
65+
String bannedId = matcher.group(1);
66+
if (accessManager.getBannedManager() != null
67+
&& accessManager.getBannedManager().checkBanned(bannedId)) {
68+
String msg = String.format("Denied by BannedChecker, accessInfo[%s].", accessInfo);
69+
if (LOG.isDebugEnabled()) {
70+
LOG.debug("BannedIdPropertyValue is {}, {}", bannedIdPropertyValue, msg);
71+
}
72+
CoordinatorMetrics.counterTotalBannedDeniedRequest.inc();
73+
return new AccessCheckResult(false, msg);
74+
}
75+
}
76+
}
77+
78+
return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
79+
}
80+
81+
@Override
82+
public void close() {}
83+
}

coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java

+3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class CoordinatorMetrics {
4343
private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
4444
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
4545
private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request";
46+
private static final String TOTAL_BANNED_DENIED_REQUEST = "total_banned_denied_request";
4647
public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";
4748
public static final String APP_NUM_TO_USER = "app_num";
4849
public static final String USER_LABEL = "user_name";
@@ -57,6 +58,7 @@ public class CoordinatorMetrics {
5758
public static Counter counterTotalCandidatesDeniedRequest;
5859
public static Counter counterTotalQuotaDeniedRequest;
5960
public static Counter counterTotalLoadDeniedRequest;
61+
public static Counter counterTotalBannedDeniedRequest;
6062
public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap();
6163

6264
private static MetricsManager metricsManager;
@@ -118,5 +120,6 @@ private static void setUpMetrics() {
118120
metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
119121
counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST);
120122
counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST);
123+
counterTotalBannedDeniedRequest = metricsManager.addCounter(TOTAL_BANNED_DENIED_REQUEST);
121124
}
122125
}

coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java

+5
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,9 @@ public Class<CoordinatorServerResource> getCoordinatorServerResource() {
4343
public Class<ApplicationResource> getApplicationResource() {
4444
return ApplicationResource.class;
4545
}
46+
47+
@Path("banned")
48+
public Class<BannedResource> getBannedResource() {
49+
return BannedResource.class;
50+
}
4651
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.coordinator.web.resource;
19+
20+
import java.util.Set;
21+
import javax.servlet.ServletContext;
22+
23+
import org.apache.commons.lang3.tuple.Pair;
24+
import org.apache.hbase.thirdparty.javax.ws.rs.GET;
25+
import org.apache.hbase.thirdparty.javax.ws.rs.POST;
26+
import org.apache.hbase.thirdparty.javax.ws.rs.Path;
27+
import org.apache.hbase.thirdparty.javax.ws.rs.Produces;
28+
import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
29+
import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import org.apache.uniffle.common.web.resource.BaseResource;
34+
import org.apache.uniffle.common.web.resource.Response;
35+
import org.apache.uniffle.coordinator.AccessManager;
36+
import org.apache.uniffle.coordinator.BannedManager;
37+
38+
@Path("/banned")
39+
@Produces({MediaType.APPLICATION_JSON})
40+
public class BannedResource extends BaseResource {
41+
private static final Logger LOG = LoggerFactory.getLogger(BannedResource.class);
42+
@Context protected ServletContext servletContext;
43+
44+
@POST
45+
@Path("/reload")
46+
public Response<String> reload(String versionId, Set<String> bannedIds) {
47+
BannedManager bannedManager = getAccessManager().getBannedManager();
48+
if (bannedManager != null && bannedIds != null) {
49+
bannedManager.reloadBannedIdsFromRest(Pair.of(versionId, bannedIds));
50+
LOG.info("reload {} banned ids.", bannedIds.size());
51+
return Response.success("success");
52+
} else {
53+
return Response.fail("bannedManager is not initialized or bannedIds is null.");
54+
}
55+
}
56+
57+
@GET
58+
@Path("/version")
59+
public Response<String> version() {
60+
BannedManager bannedManager = getAccessManager().getBannedManager();
61+
if (bannedManager != null) {
62+
String version = bannedManager.getBannedIdsFromRestVersion();
63+
LOG.info("Get version of banned ids is {}.", version);
64+
return Response.success(version);
65+
} else {
66+
return Response.fail("bannedManager is not initialized.");
67+
}
68+
}
69+
70+
private AccessManager getAccessManager() {
71+
return (AccessManager) servletContext.getAttribute(AccessManager.class.getCanonicalName());
72+
}
73+
}

0 commit comments

Comments
 (0)