From cbddabe898775310b87231a02d4681d3452c9568 Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Fri, 1 Aug 2025 14:16:10 +0800 Subject: [PATCH 01/10] support auto push efficiency --- client/all/pom.xml | 2 +- client/api/pom.xml | 2 +- client/impl/pom.xml | 2 +- client/log/pom.xml | 2 +- client/pom.xml | 2 +- core/pom.xml | 2 +- pom.xml | 2 +- server/common/model/pom.xml | 2 +- server/common/pom.xml | 2 +- server/common/util/pom.xml | 2 +- server/distribution/all/pom.xml | 2 +- server/distribution/pom.xml | 2 +- server/pom.xml | 2 +- server/remoting/api/pom.xml | 2 +- server/remoting/bolt/pom.xml | 2 +- server/remoting/http/pom.xml | 2 +- server/remoting/pom.xml | 2 +- server/server/data/pom.xml | 2 +- server/server/integration/pom.xml | 2 +- server/server/meta/pom.xml | 2 +- server/server/pom.xml | 2 +- server/server/session/pom.xml | 2 +- .../FetchPushEfficiencyConfigService.java | 40 +-- .../push/AutoPushEfficiencyConfig.java | 180 +++++++++++ .../push/AutoPushEfficiencyRegulator.java | 301 ++++++++++++++++++ .../server/session/push/ChangeProcessor.java | 24 +- .../push/PushEfficiencyConfigUpdater.java | 179 +++++++++++ .../push/PushEfficiencyImproveConfig.java | 52 +-- .../server/session/push/PushProcessor.java | 16 +- .../session/src/main/resources/log4j2.xml | 22 +- .../FetchPushEfficiencyConfigServiceTest.java | 8 +- .../push/AutoPushEfficiencyRegulatorTest.java | 110 +++++++ .../push/PushEfficiencyConfigUpdaterTest.java | 101 ++++++ .../session/store/SessionInterestsTest.java | 55 ++-- server/server/shared/pom.xml | 2 +- server/store/api/pom.xml | 2 +- server/store/jdbc/pom.xml | 2 +- server/store/jraft/pom.xml | 2 +- server/store/pom.xml | 2 +- test/pom.xml | 2 +- 40 files changed, 1014 insertions(+), 130 deletions(-) create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java create mode 100644 server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java create mode 100644 server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java diff --git a/client/all/pom.xml b/client/all/pom.xml index 659ef9630..d9a781ea0 100644 --- a/client/all/pom.xml +++ b/client/all/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-client-all - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ${project.groupId}:${project.artifactId} http://github.com/alipay/sofa-registry diff --git a/client/api/pom.xml b/client/api/pom.xml index 813ed1a56..666a9a229 100644 --- a/client/api/pom.xml +++ b/client/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/client/impl/pom.xml b/client/impl/pom.xml index eeb52e65b..c8af6dc12 100644 --- a/client/impl/pom.xml +++ b/client/impl/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/client/log/pom.xml b/client/log/pom.xml index 5a604f2b9..b7ee11855 100644 --- a/client/log/pom.xml +++ b/client/log/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/client/pom.xml b/client/pom.xml index ac8f03fa7..2c19399bd 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -7,7 +7,7 @@ com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 65c0a4404..49f827f4f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index 8993c7615..3117501f8 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT pom diff --git a/server/common/model/pom.xml b/server/common/model/pom.xml index e2bdfa81f..18fcf50d5 100644 --- a/server/common/model/pom.xml +++ b/server/common/model/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-common - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/common/pom.xml b/server/common/pom.xml index 8006db3b7..ebc6d64ae 100644 --- a/server/common/pom.xml +++ b/server/common/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/common/util/pom.xml b/server/common/util/pom.xml index b5dad36f5..db5b9882e 100644 --- a/server/common/util/pom.xml +++ b/server/common/util/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-common - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/distribution/all/pom.xml b/server/distribution/all/pom.xml index 71314cf16..972fef99b 100644 --- a/server/distribution/all/pom.xml +++ b/server/distribution/all/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-distribution - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/distribution/pom.xml b/server/distribution/pom.xml index e8088f885..c116804fa 100644 --- a/server/distribution/pom.xml +++ b/server/distribution/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/pom.xml b/server/pom.xml index dcb3dfae7..553cea954 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -7,7 +7,7 @@ com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml diff --git a/server/remoting/api/pom.xml b/server/remoting/api/pom.xml index 038267cc5..ace1dae3a 100644 --- a/server/remoting/api/pom.xml +++ b/server/remoting/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/remoting/bolt/pom.xml b/server/remoting/bolt/pom.xml index 48e58c09f..bcaca1deb 100644 --- a/server/remoting/bolt/pom.xml +++ b/server/remoting/bolt/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/remoting/http/pom.xml b/server/remoting/http/pom.xml index 7ccbcac31..558d34b3b 100644 --- a/server/remoting/http/pom.xml +++ b/server/remoting/http/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/remoting/pom.xml b/server/remoting/pom.xml index f5c17333f..a98af9f0c 100644 --- a/server/remoting/pom.xml +++ b/server/remoting/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/server/data/pom.xml b/server/server/data/pom.xml index af9af0654..f85abe93e 100644 --- a/server/server/data/pom.xml +++ b/server/server/data/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/server/integration/pom.xml b/server/server/integration/pom.xml index 854ec5ac8..881e6c629 100644 --- a/server/server/integration/pom.xml +++ b/server/server/integration/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/server/meta/pom.xml b/server/server/meta/pom.xml index d4dc470dd..7a06154af 100644 --- a/server/server/meta/pom.xml +++ b/server/server/meta/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/server/pom.xml b/server/server/pom.xml index ba00016bf..b396d367e 100644 --- a/server/server/pom.xml +++ b/server/server/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/server/session/pom.xml b/server/server/session/pom.xml index e3fc57229..d199a3450 100644 --- a/server/server/session/pom.xml +++ b/server/server/session/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java index 0f4542453..4ac87f951 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java @@ -21,10 +21,8 @@ import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; -import com.alipay.sofa.registry.server.session.push.ChangeProcessor; -import com.alipay.sofa.registry.server.session.push.FirePushService; +import com.alipay.sofa.registry.server.session.push.PushEfficiencyConfigUpdater; import com.alipay.sofa.registry.server.session.push.PushEfficiencyImproveConfig; -import com.alipay.sofa.registry.server.session.push.PushProcessor; import com.alipay.sofa.registry.server.shared.providedata.AbstractFetchSystemPropertyService; import com.alipay.sofa.registry.server.shared.providedata.SystemDataStorage; import com.alipay.sofa.registry.util.JsonUtils; @@ -43,10 +41,7 @@ public class FetchPushEfficiencyConfigService LoggerFactory.getLogger(FetchPushEfficiencyConfigService.class); @Autowired private SessionServerConfig sessionServerConfig; - @Autowired private ChangeProcessor changeProcessor; - @Autowired private PushProcessor pushProcessor; - - @Autowired private FirePushService firePushService; + @Autowired private PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater; public FetchPushEfficiencyConfigService() { super( @@ -87,11 +82,9 @@ protected boolean doProcess(SwitchStorage expect, ProvideData data) { if (!compareAndSet(expect, update)) { return false; } - changeProcessor.setWorkDelayTime(pushEfficiencyImproveConfig); - pushProcessor.setPushTaskDelayTime(pushEfficiencyImproveConfig); - if (firePushService.getRegProcessor() != null) { - firePushService.getRegProcessor().setWorkDelayTime(pushEfficiencyImproveConfig); - } + + this.pushEfficiencyConfigUpdater.updateFromProviderData(pushEfficiencyImproveConfig); + LOGGER.info( "Fetch PushEfficiencyImproveConfig success, prev={}, current={}", expect.pushEfficiencyImproveConfig, @@ -100,27 +93,16 @@ protected boolean doProcess(SwitchStorage expect, ProvideData data) { } @VisibleForTesting - public FetchPushEfficiencyConfigService setChangeProcessor(ChangeProcessor changeProcessor) { - this.changeProcessor = changeProcessor; - return this; - } - - @VisibleForTesting - public FetchPushEfficiencyConfigService setPushProcessor(PushProcessor pushProcessor) { - this.pushProcessor = pushProcessor; - return this; - } - - @VisibleForTesting - public FetchPushEfficiencyConfigService setFirePushService(FirePushService firePushService) { - this.firePushService = firePushService; + public FetchPushEfficiencyConfigService setSessionServerConfig( + SessionServerConfig sessionServerConfig) { + this.sessionServerConfig = sessionServerConfig; return this; } @VisibleForTesting - public FetchPushEfficiencyConfigService setSessionServerConfig( - SessionServerConfig sessionServerConfig) { - this.sessionServerConfig = sessionServerConfig; + public FetchPushEfficiencyConfigService setPushEfficiencyConfigUpdater( + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater) { + this.pushEfficiencyConfigUpdater = pushEfficiencyConfigUpdater; return this; } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java new file mode 100644 index 000000000..2e1396234 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.builder.ToStringBuilder; + +/** + * @author huicha + * @date 2025/7/24 + */ +public class AutoPushEfficiencyConfig { + + private static final int DEFAULT_WINDOW_NUM = 6; + + private static final long DEFAULT_WINDOW_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10L); + + private static final long DEFAULT_PUSH_COUNT_THRESHOLD = 170000L; + + private static final int DEFAULT_DEBOUNCING_TIME_MAX = 1000; + + private static final int DEFAULT_DEBOUNCING_TIME_MIN = 100; + + private static final int DEFAULT_DEBOUNCING_TIME_STEP = 100; + + private static final int DEFAULT_MAX_DEBOUNCING_TIME_MAX = 3000; + + private static final int DEFAULT_MAX_DEBOUNCING_TIME_MIN = 1000; + + private static final int DEFAULT_MAX_DEBOUNCING_TIME_STEP = 200; + + private boolean enableAutoPushEfficiency = false; + + private int windowNum = DEFAULT_WINDOW_NUM; + + private long windowTimeMillis = DEFAULT_WINDOW_TIME_MILLIS; + + private long pushCountThreshold = DEFAULT_PUSH_COUNT_THRESHOLD; + + // 启动攒批时长的自动化调整 + private boolean enableDebouncingTime = false; + + // 攒批时长的最大值 + private int debouncingTimeMax = DEFAULT_DEBOUNCING_TIME_MAX; + + // 攒批时长的最小值 + private int debouncingTimeMin = DEFAULT_DEBOUNCING_TIME_MIN; + + // 调整攒批时长的步长 + private int debouncingTimeStep = DEFAULT_DEBOUNCING_TIME_STEP; + + // 启动最大攒批时长的自动化调整 + // 可以看下下面这个方法,最大攒批时长,和攒批时长是两个不同的指标 + // @see com.alipay.sofa.registry.server.session.push.ChangeProcessor.Worker.setChangeTaskWorkDelay + private boolean enableMaxDebouncingTime = false; + + // 最大攒批时长的最大值 + private int maxDebouncingTimeMax = DEFAULT_MAX_DEBOUNCING_TIME_MAX; + + // 最大攒批时长的最小值 + private int maxDebouncingTimeMin = DEFAULT_MAX_DEBOUNCING_TIME_MIN; + + // 最大调整攒批时长的步长 + private int maxDebouncingTimeStep = DEFAULT_MAX_DEBOUNCING_TIME_STEP; + + public boolean isEnableAutoPushEfficiency() { + return enableAutoPushEfficiency; + } + + public void setEnableAutoPushEfficiency(boolean enableAutoPushEfficiency) { + this.enableAutoPushEfficiency = enableAutoPushEfficiency; + } + + public int getWindowNum() { + return windowNum; + } + + public void setWindowNum(int windowNum) { + this.windowNum = windowNum; + } + + public long getWindowTimeMillis() { + return windowTimeMillis; + } + + public void setWindowTimeMillis(long windowTimeMillis) { + this.windowTimeMillis = windowTimeMillis; + } + + public long getPushCountThreshold() { + return pushCountThreshold; + } + + public void setPushCountThreshold(long pushCountThreshold) { + this.pushCountThreshold = pushCountThreshold; + } + + public boolean isEnableDebouncingTime() { + return enableDebouncingTime; + } + + public void setEnableDebouncingTime(boolean enableDebouncingTime) { + this.enableDebouncingTime = enableDebouncingTime; + } + + public int getDebouncingTimeMax() { + return debouncingTimeMax; + } + + public void setDebouncingTimeMax(int debouncingTimeMax) { + this.debouncingTimeMax = debouncingTimeMax; + } + + public int getDebouncingTimeMin() { + return debouncingTimeMin; + } + + public void setDebouncingTimeMin(int debouncingTimeMin) { + this.debouncingTimeMin = debouncingTimeMin; + } + + public int getDebouncingTimeStep() { + return debouncingTimeStep; + } + + public void setDebouncingTimeStep(int debouncingTimeStep) { + this.debouncingTimeStep = debouncingTimeStep; + } + + public boolean isEnableMaxDebouncingTime() { + return enableMaxDebouncingTime; + } + + public void setEnableMaxDebouncingTime(boolean enableMaxDebouncingTime) { + this.enableMaxDebouncingTime = enableMaxDebouncingTime; + } + + public int getMaxDebouncingTimeMax() { + return maxDebouncingTimeMax; + } + + public void setMaxDebouncingTimeMax(int maxDebouncingTimeMax) { + this.maxDebouncingTimeMax = maxDebouncingTimeMax; + } + + public int getMaxDebouncingTimeMin() { + return maxDebouncingTimeMin; + } + + public void setMaxDebouncingTimeMin(int maxDebouncingTimeMin) { + this.maxDebouncingTimeMin = maxDebouncingTimeMin; + } + + public int getMaxDebouncingTimeStep() { + return maxDebouncingTimeStep; + } + + public void setMaxDebouncingTimeStep(int maxDebouncingTimeStep) { + this.maxDebouncingTimeStep = maxDebouncingTimeStep; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java new file mode 100644 index 000000000..38dbfdef0 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import com.alipay.sofa.registry.log.Logger; +import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.util.ConcurrentUtils; +import com.alipay.sofa.registry.util.LoopRunnable; +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 推送流控配置 + * + * @author huicha + * @date 2025/7/23 + */ +public class AutoPushEfficiencyRegulator extends LoopRunnable { + + private static final Logger LOGGER = LoggerFactory.getLogger("AUTO-PUSH-EFFICIENCY-REGULATOR"); + + private static final AtomicLong ID_GENERATOR = new AtomicLong(0); + + // 窗口的时长 (毫秒) + private final long windowTime; + + // 窗口数量 + private final int windowNum; + + // 窗口; 用于统计每个窗口内的推送次数 + private final AtomicLong[] windows; + + // 当前窗口的索引 + private final AtomicInteger index; + + // 阈值; 推送次数高于这个阈值的时候会开始逐渐调整攒批配置 + private final long pushCountThreshold; + + // 预热次数,等到所有的窗口都轮换过一遍之后才能开始统计 + // 这里因为 warmupTimes 的值总是单线程读写的,因此没有加 volatile 关键字 + private int warmupTimes; + + // 唯一 ID + private final Long id; + + // 推送效率配置更新器 + private final PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater; + + // 攒批时长 + private final IntMetric debouncingTime; + + // 最大攒批时长 + private final IntMetric maxDebouncingTime; + + public AutoPushEfficiencyRegulator( + AutoPushEfficiencyConfig autoPushEfficiencyConfig, + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater) { + // 初始化窗口相关配置 + this.windowTime = autoPushEfficiencyConfig.getWindowTimeMillis(); + this.windowNum = autoPushEfficiencyConfig.getWindowNum(); + this.windows = new AtomicLong[windowNum]; + for (int i = 0; i < windowNum; i++) { + this.windows[i] = new AtomicLong(0); + } + this.index = new AtomicInteger(0); + + // 设置其他参数 + this.id = ID_GENERATOR.incrementAndGet(); + this.pushCountThreshold = autoPushEfficiencyConfig.getPushCountThreshold(); + this.warmupTimes = 0; + this.pushEfficiencyConfigUpdater = pushEfficiencyConfigUpdater; + + // 初始化可能需要调整的指标 + this.debouncingTime = + new IntMetric( + autoPushEfficiencyConfig.isEnableDebouncingTime(), + autoPushEfficiencyConfig.getDebouncingTimeMax(), + autoPushEfficiencyConfig.getDebouncingTimeMin(), + autoPushEfficiencyConfig.getDebouncingTimeStep()); + this.maxDebouncingTime = + new IntMetric( + autoPushEfficiencyConfig.isEnableMaxDebouncingTime(), + autoPushEfficiencyConfig.getMaxDebouncingTimeMax(), + autoPushEfficiencyConfig.getMaxDebouncingTimeMin(), + autoPushEfficiencyConfig.getMaxDebouncingTimeStep()); + + // 启动定时任务 + ConcurrentUtils.createDaemonThread("AutoPushEfficiencyRegulator-" + this.id, this).start(); + } + + public void safeIncrementPushCount() { + try { + if (this.isClosed()) { + return; + } + int currentIndex = this.index.get(); + this.windows[currentIndex].incrementAndGet(); + } catch (Throwable throwable) { + LOGGER.error( + "[module=AutoPushEfficiencyRegulator][method=safeIncrementPushCount] increment push count exception", + throwable); + } + } + + /** 滚动窗口 */ + private void rollWindow() { + // 1. 获取当前窗口的索引 + int currentIndex = this.index.get(); + + // 2. 计算出下一个窗口的索引 + int newIndex = currentIndex + 1; + if (newIndex >= this.windowNum) { + newIndex = 0; + } + + // 3. 先清空下一个窗口的统计值,然后再更新索引 + this.windows[newIndex].set(0); + this.index.set(newIndex); + } + + private boolean checkPushCountIsHigh() { + long totalPushCount = 0; + for (int forIndex = 0; forIndex < this.windows.length; forIndex++) { + totalPushCount += this.windows[forIndex].get(); + } + return totalPushCount > this.pushCountThreshold; + } + + private void updateDebouncingTime(String tag) { + int debouncingTime = this.debouncingTime.load(); + int maxDebouncingTime = this.maxDebouncingTime.load(); + LOGGER.info( + "[ID: {}][{}] debouncingTime: {} maxDebouncingTime: {}", + this.id, + tag, + debouncingTime, + maxDebouncingTime); + this.pushEfficiencyConfigUpdater.updateDebouncingTime(debouncingTime, maxDebouncingTime); + } + + public Long getId() { + return id; + } + + @Override + public void runUnthrowable() { + if (this.isClosed()) { + // 如果任务已经被停止了,那么这个进程需要退出 + return; + } + + // 1. 检查是否所有的窗口都轮换过了,即是否完成了预热 + if (this.warmupTimes < this.windowNum) { + // 如果还没有,那么直接滚动窗口,不需要去检查推送频率 + this.rollWindow(); + this.warmupTimes++; + return; + } + + // 2. 已经完成预热了,检查推送频率是否过高 + if (this.checkPushCountIsHigh()) { + // 推送频率过高,尝试更新攒批时长 + boolean dataChange = false; + + if (debouncingTime.tryIncrement()) { + dataChange = true; + } + + if (maxDebouncingTime.tryIncrement()) { + dataChange = true; + } + + if (dataChange) { + this.updateDebouncingTime("Increment"); + } + } else { + // 推送频率正常,此时尝试逐渐降低攒批时长 + boolean dataChange = false; + + if (debouncingTime.tryDecrement()) { + dataChange = true; + } + + if (maxDebouncingTime.tryDecrement()) { + dataChange = true; + } + + if (dataChange) { + this.updateDebouncingTime("Decrement"); + } + } + + // 3. 滚动窗口 + // 这里放到最后滚动窗口是因为: + // 滚动窗口时,会把最新的窗口计数清零,如果先滚动后检查推送频率, + // 那么感知到的推送频率就会偏小一点 + this.rollWindow(); + } + + @Override + public void waitingUnthrowable() { + ConcurrentUtils.sleepUninterruptibly(this.windowTime, TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + public int getWindowNum() { + return this.windowNum; + } + + @VisibleForTesting + public int getWindowsSize() { + return this.windows.length; + } + + @VisibleForTesting + public long getPushCountThreshold() { + return this.pushCountThreshold; + } +} + +class IntMetric { + + private final boolean enable; + + // 指标的最大值 + private final int max; + + // 指标的最小值 + private final int min; + + // 指标的步长 + private final int step; + + // 当前指标的值 + private int current; + + public IntMetric(boolean enable, int max, int min, int step) { + this.enable = enable; + this.max = max; + this.min = min; + this.step = step; + this.current = min; + } + + public boolean tryIncrement() { + if (!this.enable) { + return false; + } + + if (this.current < this.max) { + int newValue = this.current + this.step; + if (newValue > this.max) { + newValue = this.max; + } + this.current = newValue; + return true; + } else { + return false; + } + } + + public boolean tryDecrement() { + if (!this.enable) { + return false; + } + if (this.current > this.min) { + int newValue = this.current - this.step; + if (newValue < this.min) { + newValue = this.min; + } + this.current = newValue; + return true; + } else { + return false; + } + } + + public boolean isEnable() { + return this.enable; + } + + public int load() { + return this.current; + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java index 4f6f3efc5..57e3b9cd2 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java @@ -23,11 +23,7 @@ import com.alipay.sofa.registry.util.StringFormatter; import com.alipay.sofa.registry.util.WakeUpLoopRunnable; import com.google.common.collect.Maps; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; @@ -68,6 +64,18 @@ public void setWorkDelayTime(PushEfficiencyImproveConfig pushEfficiencyImproveCo } } + public void setChangeDebouncingMillis(int changeDebouncingMillis, int changeDebouncingMaxMillis) { + for (Map.Entry entry : dataCenterWorkers.entrySet()) { + Worker[] workers = entry.getValue(); + if (workers == null) { + return; + } + for (Worker work : workers) { + work.setChangeDebouncingMillis(changeDebouncingMillis, changeDebouncingMaxMillis); + } + } + } + boolean fireChange(String dataInfoId, ChangeHandler handler, TriggerPushContext changeCtx) { ChangeKey key = new ChangeKey(changeCtx.dataCenters(), dataInfoId); Worker worker = workerOf(key); @@ -121,6 +129,12 @@ public void setChangeTaskWorkDelay(PushEfficiencyImproveConfig pushEfficiencyImp this.changeTaskWaitingMillis = pushEfficiencyImproveConfig.getChangeTaskWaitingMillis(); } + public void setChangeDebouncingMillis( + int changeDebouncingMillis, int changeDebouncingMaxMillis) { + this.changeDebouncingMillis = changeDebouncingMillis; + this.changeDebouncingMaxMillis = changeDebouncingMaxMillis; + } + int changeDebouncingMillis; int changeDebouncingMaxMillis; int changeTaskWaitingMillis = 100; diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java new file mode 100644 index 000000000..9c2a5230b --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.SmartLifecycle; +import org.springframework.stereotype.Component; + +/** + * @author huicha + * @date 2025/7/24 + */ +@Component +public class PushEfficiencyConfigUpdater implements SmartLifecycle { + + private static final Logger LOGGER = LoggerFactory.getLogger(PushEfficiencyConfigUpdater.class); + + @Autowired private ChangeProcessor changeProcessor; + + @Autowired private PushProcessor pushProcessor; + + @Autowired private FirePushService firePushService; + + private Lock lock; + + private boolean stop; + + private boolean useAutoPushEfficiency = false; + + private AutoPushEfficiencyRegulator autoPushEfficiencyRegulator; + + public PushEfficiencyConfigUpdater() { + this.lock = new ReentrantLock(); + this.stop = false; + } + + /** + * ProviderData 中关于推送相关的配置发生了变化 + * + * @param pushEfficiencyImproveConfig + */ + public void updateFromProviderData(PushEfficiencyImproveConfig pushEfficiencyImproveConfig) { + this.lock.lock(); + try { + LOGGER.info( + "[PushEfficiencyConfigUpdater] update config from provider data: {}", + pushEfficiencyImproveConfig); + + if (this.stop) { + // 已销毁 + return; + } + + AutoPushEfficiencyConfig autoPushEfficiencyConfig = + pushEfficiencyImproveConfig.getAutoPushEfficiencyConfig(); + if (null != autoPushEfficiencyConfig + && autoPushEfficiencyConfig.isEnableAutoPushEfficiency()) { + // 新的配置中,开启了自动化配置 + this.useAutoPushEfficiency = true; + + if (null != this.autoPushEfficiencyRegulator) { + // 此时还存在正在运行的 AutoPushEfficiencyRegulator 则需要关掉 + LOGGER.info( + "[PushEfficiencyConfigUpdater] close old auto push efficiency regulator, id: {}, will create new one", + this.autoPushEfficiencyRegulator.getId()); + this.autoPushEfficiencyRegulator.close(); + } + + this.autoPushEfficiencyRegulator = + new AutoPushEfficiencyRegulator(autoPushEfficiencyConfig, this); + } else { + // 新的配置中,关闭了自动化配置,此时如果还存在正在运行的 AutoPushEfficiencyRegulator 则需要关掉 + this.useAutoPushEfficiency = false; + + if (null != this.autoPushEfficiencyRegulator) { + LOGGER.info( + "[PushEfficiencyConfigUpdater] close old auto push efficiency regulator, id: {}, will not create new one", + this.autoPushEfficiencyRegulator.getId()); + this.autoPushEfficiencyRegulator.close(); + } + + this.autoPushEfficiencyRegulator = null; + } + + // 更新一下 PushProcessor 中的 AutoPushEfficiencyRegulator,以便于统计推送次数 + this.pushProcessor.setAutoPushEfficiencyRegulator(this.autoPushEfficiencyRegulator); + + // 更新配置 + this.changeProcessor.setWorkDelayTime(pushEfficiencyImproveConfig); + this.pushProcessor.setPushTaskDelayTime(pushEfficiencyImproveConfig); + if (this.firePushService.getRegProcessor() != null) { + this.firePushService.getRegProcessor().setWorkDelayTime(pushEfficiencyImproveConfig); + } + } finally { + this.lock.unlock(); + } + } + + public void updateDebouncingTime(int debouncingTime, int maxDebouncingTime) { + this.lock.lock(); + try { + if (!this.useAutoPushEfficiency) { + // 如果已经停止使用自动化配置了,那么这里就跳过更新,以防止最终实际使用的配置不是 ProvideData 中的配置 + return; + } + this.changeProcessor.setChangeDebouncingMillis(debouncingTime, maxDebouncingTime); + } finally { + this.lock.unlock(); + } + } + + @Override + public void start() {} + + @Override + public void stop() { + // Bean 被销毁的时候需要清理释放线程资源 + this.lock.lock(); + try { + if (!this.stop) { + this.stop = true; + if (null != this.autoPushEfficiencyRegulator) { + this.autoPushEfficiencyRegulator.close(); + } + } + } finally { + this.lock.unlock(); + } + } + + @Override + public boolean isRunning() { + this.lock.lock(); + try { + return this.stop; + } finally { + this.lock.unlock(); + } + } + + @VisibleForTesting + public AutoPushEfficiencyRegulator getAutoPushEfficiencyRegulator() { + return autoPushEfficiencyRegulator; + } + + @VisibleForTesting + public void setChangeProcessor(ChangeProcessor changeProcessor) { + this.changeProcessor = changeProcessor; + } + + @VisibleForTesting + public void setPushProcessor(PushProcessor pushProcessor) { + this.pushProcessor = pushProcessor; + } + + @VisibleForTesting + public void setFirePushService(FirePushService firePushService) { + this.firePushService = firePushService; + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java index a897c5f72..7a4e17fd4 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.builder.ToStringBuilder; /** * @author jiangcun.hlc@antfin.com @@ -80,6 +81,9 @@ public class PushEfficiencyImproveConfig { /** session 处理 pushTask delay pushTaskDebouncingMillis 时间处理,可以合并相同的推送任务,避免数据连续变化触发大量推送, 默认500ms */ private int sbfAppPushTaskDebouncingMillis = DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS; + /** 自动优化的相关配置 */ + private AutoPushEfficiencyConfig autoPushEfficiencyConfig = null; + /** * 判断是否满足 三板斧灰度条件 * @@ -251,6 +255,14 @@ public void setRegWorkWake(boolean regWorkWake) { this.regWorkWake = regWorkWake; } + public AutoPushEfficiencyConfig getAutoPushEfficiencyConfig() { + return autoPushEfficiencyConfig; + } + + public void setAutoPushEfficiencyConfig(AutoPushEfficiencyConfig autoPushEfficiencyConfig) { + this.autoPushEfficiencyConfig = autoPushEfficiencyConfig; + } + public void setSessionServerConfig(SessionServerConfig sessionServerConfig) { if (null != sessionServerConfig && StringUtils.isNotBlank(sessionServerConfig.getSessionServerRegion())) { @@ -272,44 +284,6 @@ public boolean validate() { @Override public String toString() { - return "PushEfficiencyImproveConfig{" - + "CURRENT_ZONE='" - + CURRENT_ZONE - + '\'' - + ", CURRENT_IP=" - + CURRENT_IP - + ", inIpZoneSBF=" - + inIpZoneSBF() - + ", DEFAULT_CHANGE_DEBOUNCING_MILLIS=" - + DEFAULT_CHANGE_DEBOUNCING_MILLIS - + ", DEFAULT_CHANGE_DEBOUNCING_MAX_MILLIS=" - + DEFAULT_CHANGE_DEBOUNCING_MAX_MILLIS - + ", DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS=" - + DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS - + ", changeDebouncingMillis=" - + changeDebouncingMillis - + ", changeDebouncingMaxMillis=" - + changeDebouncingMaxMillis - + ", changeTaskWaitingMillis=" - + changeTaskWaitingMillis - + ", pushTaskWaitingMillis=" - + pushTaskWaitingMillis - + ", pushTaskDebouncingMillis=" - + pushTaskDebouncingMillis - + ", regWorkWaitingMillis=" - + regWorkWaitingMillis - + ", ipSet=" - + ipSet - + ", zoneSet=" - + zoneSet - + ", subAppSet=" - + subAppSet - + ", sbfAppPushTaskDebouncingMillis=" - + sbfAppPushTaskDebouncingMillis - + ", pushTaskWake=" - + pushTaskWake - + ", regWorkWake=" - + regWorkWake - + '}'; + return ToStringBuilder.reflectionToString(this); } } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java index 6f4ac35a2..d287269f1 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java @@ -67,7 +67,9 @@ public class PushProcessor { @Autowired protected ClientNodeService clientNodeService; - @Autowired protected CircuitBreakerService circuitBreakerService;; + @Autowired protected CircuitBreakerService circuitBreakerService; + + private volatile AutoPushEfficiencyRegulator autoPushEfficiencyRegulator; private int pushDataTaskDebouncingMillis = 500; private PushEfficiencyImproveConfig pushEfficiencyImproveConfig; @@ -97,6 +99,11 @@ public void setPushTaskDelayTime(PushEfficiencyImproveConfig pushEfficiencyImpro this.pushEfficiencyImproveConfig = pushEfficiencyImproveConfig; } + public void setAutoPushEfficiencyRegulator( + AutoPushEfficiencyRegulator autoPushEfficiencyRegulator) { + this.autoPushEfficiencyRegulator = autoPushEfficiencyRegulator; + } + void intTaskBuffer() { if (this.taskBuffer == null) { this.taskBuffer = new PushTaskBuffer(sessionServerConfig.getPushTaskBufferBucketSize()); @@ -340,6 +347,13 @@ boolean doPush(PushTask task) { return false; } + // 如果需要则进行推送计数,以便于可以自动化调整攒批 + AutoPushEfficiencyRegulator currentAutoPushEfficiencyRegulator = + this.autoPushEfficiencyRegulator; + if (null != currentAutoPushEfficiencyRegulator) { + currentAutoPushEfficiencyRegulator.safeIncrementPushCount(); + } + pushingRecords.put( task.pushingTaskKey, new PushRecord( diff --git a/server/server/session/src/main/resources/log4j2.xml b/server/server/session/src/main/resources/log4j2.xml index c858270d8..3effc1855 100644 --- a/server/server/session/src/main/resources/log4j2.xml +++ b/server/server/session/src/main/resources/log4j2.xml @@ -841,6 +841,21 @@ + + + + + + + + + + + + + + @@ -1121,13 +1136,18 @@ + + + + + + - diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigServiceTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigServiceTest.java index 1f37b1ccd..d59737fb4 100644 --- a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigServiceTest.java +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigServiceTest.java @@ -22,9 +22,7 @@ import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.metaserver.ProvideData; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; -import com.alipay.sofa.registry.server.session.push.ChangeProcessor; -import com.alipay.sofa.registry.server.session.push.FirePushService; -import com.alipay.sofa.registry.server.session.push.PushProcessor; +import com.alipay.sofa.registry.server.session.push.PushEfficiencyConfigUpdater; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -41,9 +39,7 @@ public class FetchPushEfficiencyConfigServiceTest { public void beforeTest() { fetchPushEfficiencyConfigService = new FetchPushEfficiencyConfigService(); fetchPushEfficiencyConfigService - .setPushProcessor(mock(PushProcessor.class)) - .setChangeProcessor(mock(ChangeProcessor.class)) - .setFirePushService(mock(FirePushService.class)) + .setPushEfficiencyConfigUpdater(mock(PushEfficiencyConfigUpdater.class)) .setSessionServerConfig(mock(SessionServerConfig.class)); } diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java new file mode 100644 index 000000000..437434fa3 --- /dev/null +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * @author huicha + * @date 2025/7/24 + */ +public class AutoPushEfficiencyRegulatorTest { + + @Test + public void test() throws InterruptedException { + AutoPushEfficiencyConfig autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); + // 开启自动化配置 + autoPushEfficiencyConfig.setEnableAutoPushEfficiency(true); + autoPushEfficiencyConfig.setEnableDebouncingTime(true); + autoPushEfficiencyConfig.setEnableMaxDebouncingTime(true); + + // 总推送数超过 10 笔就触发增加攒批时长 + autoPushEfficiencyConfig.setPushCountThreshold(10); + + autoPushEfficiencyConfig.setWindowNum(10); + autoPushEfficiencyConfig.setWindowTimeMillis(100); + + MockPushEfficiencyConfigUpdater mockPushEfficiencyConfigUpdater = + new MockPushEfficiencyConfigUpdater(); + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater = + Mockito.mock(PushEfficiencyConfigUpdater.class, mockPushEfficiencyConfigUpdater); + + AutoPushEfficiencyRegulator autoPushEfficiencyRegulator = + new AutoPushEfficiencyRegulator(autoPushEfficiencyConfig, pushEfficiencyConfigUpdater); + + try { + for (int loop = 0; loop < 40; loop++) { + for (int i = 0; i < 5; i++) { + autoPushEfficiencyRegulator.safeIncrementPushCount(); + } + Thread.sleep(50); + } + + int debouncingTime = mockPushEfficiencyConfigUpdater.getDebouncingTime(); + int maxDebouncingTime = mockPushEfficiencyConfigUpdater.getMaxDebouncingTime(); + Assert.assertEquals( + debouncingTime, 1000 /*AutoPushEfficiencyConfig.DEFAULT_DEBOUNCING_TIME_MAX*/); + Assert.assertEquals( + maxDebouncingTime, 3000 /*AutoPushEfficiencyConfig.DEFAULT_MAX_DEBOUNCING_TIME_MAX*/); + + Thread.sleep(2000); + + debouncingTime = mockPushEfficiencyConfigUpdater.getDebouncingTime(); + maxDebouncingTime = mockPushEfficiencyConfigUpdater.getMaxDebouncingTime(); + Assert.assertEquals( + debouncingTime, 100 /*AutoPushEfficiencyConfig.DEFAULT_DEBOUNCING_TIME_MIN*/); + Assert.assertEquals( + maxDebouncingTime, 1000 /*AutoPushEfficiencyConfig.DEFAULT_MAX_DEBOUNCING_TIME_MIN*/); + } finally { + autoPushEfficiencyRegulator.close(); + } + } +} + +class MockPushEfficiencyConfigUpdater implements Answer { + + private int debouncingTime; + + private int maxDebouncingTime; + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + this.debouncingTime = invocation.getArgumentAt(0, Integer.class); + this.maxDebouncingTime = invocation.getArgumentAt(1, Integer.class); + return null; + } + + public int getDebouncingTime() { + return debouncingTime; + } + + public void setDebouncingTime(int debouncingTime) { + this.debouncingTime = debouncingTime; + } + + public int getMaxDebouncingTime() { + return maxDebouncingTime; + } + + public void setMaxDebouncingTime(int maxDebouncingTime) { + this.maxDebouncingTime = maxDebouncingTime; + } +} diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java new file mode 100644 index 000000000..14829c9b4 --- /dev/null +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * @author huicha + * @date 2025/7/24 + */ +public class PushEfficiencyConfigUpdaterTest { + + @Test + public void testUpdateFromProviderData() { + ChangeProcessor changeProcessor = Mockito.mock(ChangeProcessor.class); + PushProcessor pushProcessor = Mockito.mock(PushProcessor.class); + FirePushService firePushService = Mockito.mock(FirePushService.class); + + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater = new PushEfficiencyConfigUpdater(); + pushEfficiencyConfigUpdater.setChangeProcessor(changeProcessor); + pushEfficiencyConfigUpdater.setPushProcessor(pushProcessor); + pushEfficiencyConfigUpdater.setFirePushService(firePushService); + + // 更新没有开启自动化配置,因此预期是 null + pushEfficiencyConfigUpdater.updateFromProviderData(new PushEfficiencyImproveConfig()); + + AutoPushEfficiencyRegulator autoPushEfficiencyRegulator = + pushEfficiencyConfigUpdater.getAutoPushEfficiencyRegulator(); + Assert.assertNull(autoPushEfficiencyRegulator); + + // 第二次开启自动化配置 + AutoPushEfficiencyConfig autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); + autoPushEfficiencyConfig.setEnableAutoPushEfficiency(true); + autoPushEfficiencyConfig.setEnableDebouncingTime(true); + autoPushEfficiencyConfig.setEnableMaxDebouncingTime(true); + autoPushEfficiencyConfig.setWindowTimeMillis(10); + + PushEfficiencyImproveConfig pushEfficiencyImproveConfig = new PushEfficiencyImproveConfig(); + pushEfficiencyImproveConfig.setAutoPushEfficiencyConfig(autoPushEfficiencyConfig); + + pushEfficiencyConfigUpdater.updateFromProviderData(pushEfficiencyImproveConfig); + + autoPushEfficiencyRegulator = pushEfficiencyConfigUpdater.getAutoPushEfficiencyRegulator(); + Assert.assertNotNull(autoPushEfficiencyRegulator); + Long autoPushEfficiencyRegulatorId = autoPushEfficiencyRegulator.getId(); + Assert.assertEquals(1L, (long) autoPushEfficiencyRegulatorId); + + // 第三次仍然开启,但是我们修改一部分配置 + autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); + autoPushEfficiencyConfig.setEnableAutoPushEfficiency(true); + autoPushEfficiencyConfig.setEnableDebouncingTime(true); + autoPushEfficiencyConfig.setEnableMaxDebouncingTime(true); + autoPushEfficiencyConfig.setWindowTimeMillis(10); + autoPushEfficiencyConfig.setWindowNum(3); + autoPushEfficiencyConfig.setPushCountThreshold(10); + + pushEfficiencyImproveConfig = new PushEfficiencyImproveConfig(); + pushEfficiencyImproveConfig.setAutoPushEfficiencyConfig(autoPushEfficiencyConfig); + + pushEfficiencyConfigUpdater.updateFromProviderData(pushEfficiencyImproveConfig); + + AutoPushEfficiencyRegulator newAutoPushEfficiencyRegulator = + pushEfficiencyConfigUpdater.getAutoPushEfficiencyRegulator(); + Assert.assertNotNull(newAutoPushEfficiencyRegulator); + + Long newAutoPushEfficiencyRegulatorId = newAutoPushEfficiencyRegulator.getId(); + int windowNum = newAutoPushEfficiencyRegulator.getWindowNum(); + int windowSize = newAutoPushEfficiencyRegulator.getWindowsSize(); + long pushCountThreshold = newAutoPushEfficiencyRegulator.getPushCountThreshold(); + + Assert.assertEquals(2L, (long) newAutoPushEfficiencyRegulatorId); + Assert.assertEquals(3, windowNum); + Assert.assertEquals(3, windowSize); + Assert.assertEquals(10, pushCountThreshold); + + // 此时之前的 AutoPushEfficiencyRegulator 应当为关闭状态 + Assert.assertTrue(autoPushEfficiencyRegulator.isClosed()); + + // 清理释放线程资源 + pushEfficiencyConfigUpdater.stop(); + + // 新的 AutoPushEfficiencyRegulator 也应当为关闭状态 + Assert.assertTrue(newAutoPushEfficiencyRegulator.isClosed()); + } +} diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/SessionInterestsTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/SessionInterestsTest.java index 02406e9e3..65f9cdbdb 100644 --- a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/SessionInterestsTest.java +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/SessionInterestsTest.java @@ -16,19 +16,14 @@ */ package com.alipay.sofa.registry.server.session.store; -import com.alipay.sofa.registry.common.model.ElementType; import com.alipay.sofa.registry.common.model.dataserver.DatumVersion; import com.alipay.sofa.registry.common.model.sessionserver.SubscriberCountByApp; -import com.alipay.sofa.registry.common.model.store.BaseInfo; import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.common.model.store.Subscriber; -import com.alipay.sofa.registry.common.model.store.URL; -import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.server.session.AbstractSessionServerTestBase; import com.alipay.sofa.registry.server.session.registry.SessionRegistry.SelectSubscriber; import com.google.common.collect.Maps; import com.google.common.collect.Sets; - import java.util.*; import java.util.Map.Entry; import org.junit.Assert; @@ -221,7 +216,8 @@ public void testGetInterestsByOption() { String testOptionDataId = "test-option-data-id"; String testOptionGroup = "test-option-group"; String testOptionInstanceId = "test-option-instance-id"; - String testOptionDataInfoId = DataInfo.toDataInfoId(testOptionDataId, testOptionInstanceId, testOptionGroup); + String testOptionDataInfoId = + DataInfo.toDataInfoId(testOptionDataId, testOptionInstanceId, testOptionGroup); String testOptionAppNameOne = "test-option-app-1"; String testOptionAppNameTwo = "test-option-app-2"; int appOneNum = 3; @@ -229,20 +225,25 @@ public void testGetInterestsByOption() { Map appOneSubscribers = new HashMap<>(); for (int i = 0; i < appOneNum; i++) { - Subscriber subscriber = createSubscriber(testOptionDataId, testOptionGroup, testOptionInstanceId, testOptionAppNameOne); + Subscriber subscriber = + createSubscriber( + testOptionDataId, testOptionGroup, testOptionInstanceId, testOptionAppNameOne); appOneSubscribers.put(subscriber.getRegisterId(), subscriber); this.interests.add(subscriber); } Map appTwoSubscribers = new HashMap<>(); for (int i = 0; i < appTwoNum; i++) { - Subscriber subscriber = createSubscriber(testOptionDataId, testOptionGroup, testOptionInstanceId, testOptionAppNameTwo); + Subscriber subscriber = + createSubscriber( + testOptionDataId, testOptionGroup, testOptionInstanceId, testOptionAppNameTwo); appTwoSubscribers.put(subscriber.getRegisterId(), subscriber); this.interests.add(subscriber); } // 预期可以查询出全部的 app-1 - Collection subscribersResult1 = this.interests.getInterestsByOption(testOptionDataInfoId, testOptionAppNameOne, appOneNum); + Collection subscribersResult1 = + this.interests.getInterestsByOption(testOptionDataInfoId, testOptionAppNameOne, appOneNum); Assert.assertEquals(subscribersResult1.size(), appOneNum); // size 相同且每个元素都存在,那么就证明是 Assert.assertEquals(subscribersResult1.size(), appOneSubscribers.size()); @@ -253,7 +254,8 @@ public void testGetInterestsByOption() { } // 预期可以查询出 1 个 app-1 - Collection subscribersResult2 = this.interests.getInterestsByOption(testOptionDataInfoId, testOptionAppNameOne, 1); + Collection subscribersResult2 = + this.interests.getInterestsByOption(testOptionDataInfoId, testOptionAppNameOne, 1); Assert.assertEquals(subscribersResult2.size(), 1); Subscriber oneOfSubscribers2 = subscribersResult2.iterator().next(); Assert.assertEquals(oneOfSubscribers2.getAppName(), testOptionAppNameOne); @@ -261,11 +263,16 @@ public void testGetInterestsByOption() { Assert.assertTrue(appOneSubscribers.containsKey(oneOfSubscribers2.getRegisterId())); // 不设置任何查询条件,预期可以查询出全部的 app-1 和 app-2 - Collection subscribersResult3 = this.interests.getInterestsByOption(testOptionDataInfoId, null, 0); + Collection subscribersResult3 = + this.interests.getInterestsByOption(testOptionDataInfoId, null, 0); Assert.assertEquals(subscribersResult3.size(), appOneNum + appTwoNum); for (Subscriber subscriber : subscribersResult3) { - Assert.assertTrue(appOneSubscribers.containsKey(subscriber.getRegisterId()) || appTwoSubscribers.containsKey(subscriber.getRegisterId())); - Assert.assertTrue(subscriber.getAppName().equals(testOptionAppNameOne) || subscriber.getAppName().equals(testOptionAppNameTwo)); + Assert.assertTrue( + appOneSubscribers.containsKey(subscriber.getRegisterId()) + || appTwoSubscribers.containsKey(subscriber.getRegisterId())); + Assert.assertTrue( + subscriber.getAppName().equals(testOptionAppNameOne) + || subscriber.getAppName().equals(testOptionAppNameTwo)); Assert.assertEquals(subscriber.getDataInfoId(), testOptionDataInfoId); } } @@ -275,23 +282,29 @@ public void testGetSubscriberCountByApp() { String testCountDataId = "test-count-data-id"; String testCountGroup = "test-count-group"; String testCountInstanceId = "test-count-instance-id"; - String testCountDataInfoId = DataInfo.toDataInfoId(testCountDataId, testCountInstanceId, testCountGroup); + String testCountDataInfoId = + DataInfo.toDataInfoId(testCountDataId, testCountInstanceId, testCountGroup); String testCountAppNameOne = "test-count-app-1"; String testCountAppNameTwo = "test-count-app-2"; int appOneNum = 3; int appTwoNum = 2; for (int i = 0; i < appOneNum; i++) { - Subscriber subscriber = createSubscriber(testCountDataId, testCountGroup, testCountInstanceId, testCountAppNameOne); + Subscriber subscriber = + createSubscriber( + testCountDataId, testCountGroup, testCountInstanceId, testCountAppNameOne); this.interests.add(subscriber); } for (int i = 0; i < appTwoNum; i++) { - Subscriber subscriber = createSubscriber(testCountDataId, testCountGroup, testCountInstanceId, testCountAppNameTwo); + Subscriber subscriber = + createSubscriber( + testCountDataId, testCountGroup, testCountInstanceId, testCountAppNameTwo); this.interests.add(subscriber); } - List subscriberCountByApps = this.interests.getSubscriberCountByApp(testCountDataInfoId); + List subscriberCountByApps = + this.interests.getSubscriberCountByApp(testCountDataInfoId); Assert.assertEquals(subscriberCountByApps.size(), 2); for (SubscriberCountByApp subscriberCountByApp : subscriberCountByApps) { @@ -303,14 +316,14 @@ public void testGetSubscriberCountByApp() { } } - private Subscriber createSubscriber(String dataInfo, String group, String instanceId, String appName) { + private Subscriber createSubscriber( + String dataInfo, String group, String instanceId, String appName) { Subscriber subscriber = this.randomSubscriber(dataInfo, instanceId); subscriber.setAppName(appName); subscriber.setGroup(group); subscriber.setDataInfoId( - DataInfo.toDataInfoId( - subscriber.getDataId(), subscriber.getInstanceId(), subscriber.getGroup())); + DataInfo.toDataInfoId( + subscriber.getDataId(), subscriber.getInstanceId(), subscriber.getGroup())); return subscriber; } - } diff --git a/server/server/shared/pom.xml b/server/server/shared/pom.xml index 83ffd6452..5bf85e5de 100644 --- a/server/server/shared/pom.xml +++ b/server/server/shared/pom.xml @@ -5,7 +5,7 @@ registry-server com.alipay.sofa - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT 4.0.0 diff --git a/server/store/api/pom.xml b/server/store/api/pom.xml index c78c6a09b..708c72970 100644 --- a/server/store/api/pom.xml +++ b/server/store/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-store - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/server/store/jdbc/pom.xml b/server/store/jdbc/pom.xml index 6e885e7be..a6b2fef81 100644 --- a/server/store/jdbc/pom.xml +++ b/server/store/jdbc/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-store - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml diff --git a/server/store/jraft/pom.xml b/server/store/jraft/pom.xml index 33b39f945..83ebe9164 100644 --- a/server/store/jraft/pom.xml +++ b/server/store/jraft/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-store - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml diff --git a/server/store/pom.xml b/server/store/pom.xml index ca0764cf8..e3850c84b 100644 --- a/server/store/pom.xml +++ b/server/store/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 diff --git a/test/pom.xml b/test/pom.xml index 5b7720792..91069b453 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator-SNAPSHOT ../pom.xml 4.0.0 From b98142f6b46e10f6284b33af19cf3db9666bf709 Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Tue, 5 Aug 2025 19:47:20 +0800 Subject: [PATCH 02/10] =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=87=AA=E9=80=82?= =?UTF-8?q?=E5=BA=94=E5=BC=80=E5=85=B3=E6=B5=81=E9=99=90=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../push/AutoPushEfficiencyConfig.java | 22 +++ .../push/AutoPushEfficiencyRegulator.java | 167 +++++++++++++++--- .../push/PushEfficiencyConfigUpdater.java | 21 +++ .../resource/ClientManagerResource.java | 34 ++++ 4 files changed, 219 insertions(+), 25 deletions(-) diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java index 2e1396234..376c86419 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java @@ -43,6 +43,8 @@ public class AutoPushEfficiencyConfig { private static final int DEFAULT_MAX_DEBOUNCING_TIME_STEP = 200; + private static final double DEFAULT_LOAD_THRESHOLD = 6; + private boolean enableAutoPushEfficiency = false; private int windowNum = DEFAULT_WINDOW_NUM; @@ -51,6 +53,7 @@ public class AutoPushEfficiencyConfig { private long pushCountThreshold = DEFAULT_PUSH_COUNT_THRESHOLD; + // == 攒批时长参数 == // 启动攒批时长的自动化调整 private boolean enableDebouncingTime = false; @@ -76,6 +79,13 @@ public class AutoPushEfficiencyConfig { // 最大调整攒批时长的步长 private int maxDebouncingTimeStep = DEFAULT_MAX_DEBOUNCING_TIME_STEP; + // == 攒批时长参数 == + + // == 开关流限流参数 == + private boolean enableTrafficOperateLimitSwitch = false; + + private double loadThreshold = DEFAULT_LOAD_THRESHOLD; + // == 开关流限流参数 == public boolean isEnableAutoPushEfficiency() { return enableAutoPushEfficiency; @@ -173,6 +183,18 @@ public void setMaxDebouncingTimeStep(int maxDebouncingTimeStep) { this.maxDebouncingTimeStep = maxDebouncingTimeStep; } + public boolean isEnableTrafficOperateLimitSwitch() { + return enableTrafficOperateLimitSwitch; + } + + public double getLoadThreshold() { + return loadThreshold; + } + + public void setLoadThreshold(double loadThreshold) { + this.loadThreshold = loadThreshold; + } + @Override public String toString() { return ToStringBuilder.reflectionToString(this); diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java index 38dbfdef0..c07de7595 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java @@ -21,6 +21,8 @@ import com.alipay.sofa.registry.util.ConcurrentUtils; import com.alipay.sofa.registry.util.LoopRunnable; import com.google.common.annotations.VisibleForTesting; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -52,6 +54,10 @@ public class AutoPushEfficiencyRegulator extends LoopRunnable { // 阈值; 推送次数高于这个阈值的时候会开始逐渐调整攒批配置 private final long pushCountThreshold; + // 阈值; 用于调整开关流限流开关的 Load 阈值。 + // 不影响推送攒批配置 + private final double loadThreshold; + // 预热次数,等到所有的窗口都轮换过一遍之后才能开始统计 // 这里因为 warmupTimes 的值总是单线程读写的,因此没有加 volatile 关键字 private int warmupTimes; @@ -62,12 +68,20 @@ public class AutoPushEfficiencyRegulator extends LoopRunnable { // 推送效率配置更新器 private final PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater; + // 采集系统负载指标 + private final OperatingSystemMXBean operatingSystemMXBean; + // 攒批时长 private final IntMetric debouncingTime; // 最大攒批时长 private final IntMetric maxDebouncingTime; + // 开关流限流 + // 因为命名为 enable traffic operate 在这里可能会有些歧义 + // 可能会有人理解成 load 过高时是否操作开关流限流,因此这里命名的比较长 + private final BooleanMetric trafficOperateLimitSwitch; + public AutoPushEfficiencyRegulator( AutoPushEfficiencyConfig autoPushEfficiencyConfig, PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater) { @@ -83,8 +97,10 @@ public AutoPushEfficiencyRegulator( // 设置其他参数 this.id = ID_GENERATOR.incrementAndGet(); this.pushCountThreshold = autoPushEfficiencyConfig.getPushCountThreshold(); + this.loadThreshold = autoPushEfficiencyConfig.getLoadThreshold(); this.warmupTimes = 0; this.pushEfficiencyConfigUpdater = pushEfficiencyConfigUpdater; + this.operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); // 初始化可能需要调整的指标 this.debouncingTime = @@ -99,6 +115,8 @@ public AutoPushEfficiencyRegulator( autoPushEfficiencyConfig.getMaxDebouncingTimeMax(), autoPushEfficiencyConfig.getMaxDebouncingTimeMin(), autoPushEfficiencyConfig.getMaxDebouncingTimeStep()); + this.trafficOperateLimitSwitch = + new BooleanMetric(autoPushEfficiencyConfig.isEnableTrafficOperateLimitSwitch()); // 启动定时任务 ConcurrentUtils.createDaemonThread("AutoPushEfficiencyRegulator-" + this.id, this).start(); @@ -134,11 +152,16 @@ private void rollWindow() { this.index.set(newIndex); } - private boolean checkPushCountIsHigh() { + private long computeTotalPushCount() { long totalPushCount = 0; for (int forIndex = 0; forIndex < this.windows.length; forIndex++) { totalPushCount += this.windows[forIndex].get(); } + return totalPushCount; + } + + private boolean checkPushCountIsHigh() { + long totalPushCount = this.computeTotalPushCount(); return totalPushCount > this.pushCountThreshold; } @@ -154,6 +177,12 @@ private void updateDebouncingTime(String tag) { this.pushEfficiencyConfigUpdater.updateDebouncingTime(debouncingTime, maxDebouncingTime); } + private void updateTrafficOperateLimitSwitch() { + boolean trafficOperateLimitSwitch = this.trafficOperateLimitSwitch.load(); + LOGGER.info("[ID: {}] trafficOperateLimitSwitch: {}", this.id, trafficOperateLimitSwitch); + this.pushEfficiencyConfigUpdater.updateTrafficOperateLimitSwitch(trafficOperateLimitSwitch); + } + public Long getId() { return id; } @@ -174,43 +203,87 @@ public void runUnthrowable() { } // 2. 已经完成预热了,检查推送频率是否过高 - if (this.checkPushCountIsHigh()) { - // 推送频率过高,尝试更新攒批时长 - boolean dataChange = false; + boolean pushCountIsHigh = this.checkPushCountIsHigh(); - if (debouncingTime.tryIncrement()) { - dataChange = true; - } + // 3. 根据推送频率调整推送配置 + this.tryUpdatePushConfig(pushCountIsHigh); + + // 4. 根据推送频率以及负载情况调整开关流限流配置 + this.tryUpdateTrafficOperateLimitSwitch(pushCountIsHigh); + + // 3. 滚动窗口 + // 这里放到最后滚动窗口是因为: + // 滚动窗口时,会把最新的窗口计数清零,如果先滚动后检查推送频率, + // 那么感知到的推送频率就会偏小一点 + this.rollWindow(); + } + + private void tryUpdateTrafficOperateLimitSwitch(boolean pushCountIsHigh) { + if (!this.trafficOperateLimitSwitch.isEnable()) { + // 如果没有开启支持操作开关流,那么就不执行后续的代码了,尽量尝试避免获取系统负载 + return; + } - if (maxDebouncingTime.tryIncrement()) { - dataChange = true; + // 这里获取到的是过去一分钟的负载平均值,这个值有可能小于 0,小于 0 时表示无法获取平均负载 + // 另外,这个方法的注释上写了这个方法设计上就会考虑可能较频繁调用,因此这里先不考虑做限制了 + double loadAverage = this.operatingSystemMXBean.getSystemLoadAverage(); + if (loadAverage < 0) { + return; + } + + boolean loadIsHigh = loadAverage > loadThreshold; + + if (pushCountIsHigh && loadIsHigh) { + if (this.trafficOperateLimitSwitch.tryTurnOn()) { + this.updateTrafficOperateLimitSwitch(); + } + } else { + if (this.trafficOperateLimitSwitch.tryTurnOff()) { + this.updateTrafficOperateLimitSwitch(); } + } + } - if (dataChange) { + private void tryUpdatePushConfig(boolean pushCountIsHigh) { + if (pushCountIsHigh) { + // 推送频率过高,尝试更新攒批时长 + if (this.tryIncrementPushConfig()) { this.updateDebouncingTime("Increment"); } } else { // 推送频率正常,此时尝试逐渐降低攒批时长 - boolean dataChange = false; - - if (debouncingTime.tryDecrement()) { - dataChange = true; + if (this.tryDecrementPushConfig()) { + this.updateDebouncingTime("Decrement"); } + } + } - if (maxDebouncingTime.tryDecrement()) { - dataChange = true; - } + private boolean tryIncrementPushConfig() { + boolean dataChange = false; - if (dataChange) { - this.updateDebouncingTime("Decrement"); - } + if (debouncingTime.tryIncrement()) { + dataChange = true; } - // 3. 滚动窗口 - // 这里放到最后滚动窗口是因为: - // 滚动窗口时,会把最新的窗口计数清零,如果先滚动后检查推送频率, - // 那么感知到的推送频率就会偏小一点 - this.rollWindow(); + if (maxDebouncingTime.tryIncrement()) { + dataChange = true; + } + + return dataChange; + } + + private boolean tryDecrementPushConfig() { + boolean dataChange = false; + + if (debouncingTime.tryDecrement()) { + dataChange = true; + } + + if (maxDebouncingTime.tryDecrement()) { + dataChange = true; + } + + return dataChange; } @Override @@ -299,3 +372,47 @@ public int load() { return this.current; } } + +class BooleanMetric { + + private final boolean enable; + + private boolean current; + + public BooleanMetric(boolean enable) { + this.enable = enable; + this.current = false; + } + + public boolean tryTurnOn() { + if (!this.enable) { + return false; + } + if (this.current) { + return false; + } else { + this.current = true; + return true; + } + } + + public boolean tryTurnOff() { + if (!this.enable) { + return false; + } + if (this.current) { + this.current = false; + return true; + } else { + return false; + } + } + + public boolean isEnable() { + return this.enable; + } + + public boolean load() { + return this.current; + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java index 9c2a5230b..3101f7634 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.registry.server.session.push; +import com.alipay.sofa.registry.server.session.resource.ClientManagerResource; import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -40,6 +41,8 @@ public class PushEfficiencyConfigUpdater implements SmartLifecycle { @Autowired private FirePushService firePushService; + @Autowired private ClientManagerResource clientManagerResource; + private Lock lock; private boolean stop; @@ -110,6 +113,9 @@ public void updateFromProviderData(PushEfficiencyImproveConfig pushEfficiencyImp if (this.firePushService.getRegProcessor() != null) { this.firePushService.getRegProcessor().setWorkDelayTime(pushEfficiencyImproveConfig); } + + // 无论如何,先关闭掉限流 + this.clientManagerResource.setEnableTrafficOperate(true); } finally { this.lock.unlock(); } @@ -128,6 +134,21 @@ public void updateDebouncingTime(int debouncingTime, int maxDebouncingTime) { } } + public void updateTrafficOperateLimitSwitch(boolean trafficOperateLimitSwitch) { + this.lock.lock(); + try { + if (!this.useAutoPushEfficiency) { + // 如果已经停止使用自动化配置了,那么这里就跳过更新,以防止最终实际使用的配置不是 ProvideData 中的配置 + return; + } + + // 打开限制开关,意味着开启了限流,也就是不允许操作开关流,因此这里是反的 + this.clientManagerResource.setEnableTrafficOperate(!trafficOperateLimitSwitch); + } finally { + this.lock.unlock(); + } + } + @Override public void start() {} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java index b9fde3322..2d429abca 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java @@ -78,6 +78,8 @@ public class ClientManagerResource { @Autowired protected ExecutorManager executorManager; + private volatile boolean enableTrafficOperate = true; + /** * Client off * @@ -90,6 +92,12 @@ public CommonResponse clientOff(@FormParam("ips") String ips) { if (StringUtils.isEmpty(ips)) { return CommonResponse.buildFailedResponse("ips is empty"); } + + if (!this.enableTrafficOperate) { + // 限流,不允许操作开关流 + return CommonResponse.buildFailedResponse("too many request"); + } + final Set ipSet = CollectionSdks.toIpSet(ips); List conIds = connectionsService.getIpConnects(ipSet); sessionRegistry.clientOff(conIds); @@ -109,6 +117,12 @@ public CommonResponse clientOn(@FormParam("ips") String ips) { if (StringUtils.isEmpty(ips)) { return CommonResponse.buildFailedResponse("ips is empty"); } + + if (!this.enableTrafficOperate) { + // 限流,不允许操作开关流 + return CommonResponse.buildFailedResponse("too many request"); + } + final List ipList = CollectionSdks.toIpList(ips); List conIds = connectionsService.closeIpConnects(ipList); LOGGER.info("clientOn ips={}, conIds={}", ips, conIds); @@ -128,6 +142,12 @@ public CommonResponse clientOffInZone(@FormParam("ips") String ips) { if (StringUtils.isEmpty(ips)) { return CommonResponse.buildFailedResponse("ips is empty"); } + + if (!this.enableTrafficOperate) { + // 限流,不允许操作开关流 + return CommonResponse.buildFailedResponse("too many request"); + } + CommonResponse resp = clientOff(ips); if (!resp.isSuccess()) { return resp; @@ -164,6 +184,12 @@ public CommonResponse clientOnInZone(@FormParam("ips") String ips) { if (StringUtils.isEmpty(ips)) { return CommonResponse.buildFailedResponse("ips is empty"); } + + if (!this.enableTrafficOperate) { + // 限流,不允许操作开关流 + return CommonResponse.buildFailedResponse("too many request"); + } + CommonResponse resp = clientOn(ips); if (!resp.isSuccess()) { return resp; @@ -238,4 +264,12 @@ public Map connectionMapper() { public List getOtherConsoleServersCurrentZone() { return Sdks.getOtherConsoleServers(null, sessionServerConfig, metaServerService); } + + public boolean isEnableTrafficOperate() { + return enableTrafficOperate; + } + + public void setEnableTrafficOperate(boolean enableTrafficOperate) { + this.enableTrafficOperate = enableTrafficOperate; + } } From 375b2f1da2f7016229dad82b7db6f4fd22ff8a3d Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Fri, 29 Aug 2025 15:25:04 +0800 Subject: [PATCH 03/10] Extend the Meta Handler and add a method to query all multi-datacenter synchronization configurations. --- .../meta/bootstrap/MetaServerBootstrap.java | 21 +++++++++++++---- .../resource/MultiClusterSyncResource.java | 23 ++++++++++++------- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java index a08a66f5a..70fb27f71 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java @@ -42,14 +42,13 @@ import com.github.rholder.retry.WaitStrategies; import com.google.common.base.Predicate; import java.lang.annotation.Annotation; -import java.util.Collection; -import java.util.Date; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Resource; import javax.ws.rs.Path; import javax.ws.rs.ext.Provider; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.glassfish.jersey.server.ResourceConfig; import org.springframework.beans.factory.annotation.Autowired; @@ -236,12 +235,22 @@ private void renewNode() { private void openSessionRegisterServer() { try { if (rpcServerForSessionStarted.compareAndSet(false, true)) { + List mergedSessionServerHandlers = + new ArrayList<>(this.sessionServerHandlers); + + Collection customSessionServerHandlers = + this.customSessionServerHandlers(); + if (CollectionUtils.isNotEmpty(customSessionServerHandlers)) { + mergedSessionServerHandlers.addAll(this.customSessionServerHandlers()); + } + sessionServer = boltExchange.open( new URL( NetUtil.getLocalAddress().getHostAddress(), metaServerConfig.getSessionServerPort()), - sessionServerHandlers.toArray(new ChannelHandler[sessionServerHandlers.size()])); + mergedSessionServerHandlers.toArray( + new ChannelHandler[mergedSessionServerHandlers.size()])); LOGGER.info( "Open session node register server port {} success!", @@ -257,6 +266,10 @@ private void openSessionRegisterServer() { } } + protected Collection customSessionServerHandlers() { + return Collections.emptyList(); + } + private void openDataRegisterServer() { try { if (rpcServerForDataStarted.compareAndSet(false, true)) { diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java index 1583b995b..a41c06836 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java @@ -26,18 +26,15 @@ import com.alipay.sofa.registry.store.api.meta.MultiClusterSyncRepository; import com.alipay.sofa.registry.util.StringFormatter; import com.google.common.collect.Sets; -import java.util.Locale; -import javax.ws.rs.FormParam; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.math.NumberUtils; import org.springframework.beans.factory.annotation.Autowired; +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import java.util.Locale; +import java.util.Set; + /** * @author xiaojian.xj * @version : RecoverConfigResource.java, v 0.1 2021年09月25日 00:02 xiaojian.xj Exp $ @@ -66,6 +63,16 @@ public GenericResponse query( return response; } + @GET + @Path("queryAll") + @Produces(MediaType.APPLICATION_JSON) + public GenericResponse> queryAll() { + GenericResponse> response = new GenericResponse(); + Set queryResult = multiClusterSyncRepository.queryLocalSyncInfos(); + response.fillSucceed(queryResult); + return response; + } + @POST @Path("/save") @Produces(MediaType.APPLICATION_JSON) From 76e1e98f1086059587ba360fff6ff5aa38ef14af Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Tue, 2 Sep 2025 19:29:24 +0800 Subject: [PATCH 04/10] add new interface --- .../resource/MultiClusterSyncResource.java | 104 +++++++++++++++++- 1 file changed, 99 insertions(+), 5 deletions(-) diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java index a41c06836..0fe33da97 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java @@ -26,15 +26,14 @@ import com.alipay.sofa.registry.store.api.meta.MultiClusterSyncRepository; import com.alipay.sofa.registry.util.StringFormatter; import com.google.common.collect.Sets; +import java.util.Locale; +import java.util.Set; +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.math.NumberUtils; import org.springframework.beans.factory.annotation.Autowired; -import javax.ws.rs.*; -import javax.ws.rs.core.MediaType; -import java.util.Locale; -import java.util.Set; - /** * @author xiaojian.xj * @version : RecoverConfigResource.java, v 0.1 2021年09月25日 00:02 xiaojian.xj Exp $ @@ -524,4 +523,99 @@ public CommonResponse removeConfig( response.setSuccess(ret > 0); return response; } + + @POST + @Path("/sync/ignoreDataInfoIds/add") + @Produces(MediaType.APPLICATION_JSON) + public CommonResponse addIgnoreSyncDataInfoIds( + @FormParam("remoteDataCenter") String remoteDataCenter, + @FormParam("ignoreDataInfoIds") String ignoreDataInfoIds, + @FormParam("token") String token, + @FormParam("expectVersion") String expectVersion) { + if (!AuthChecker.authCheck(token)) { + LOG.error( + "add ignoreDataInfoIds, remoteDataCenter={}, ignoreDataInfoIds={}, auth check={} fail!", + remoteDataCenter, + ignoreDataInfoIds, + token); + return GenericResponse.buildFailedResponse("auth check fail"); + } + + if (StringUtils.isBlank(remoteDataCenter) + || StringUtils.isBlank(ignoreDataInfoIds) + || StringUtils.isBlank(expectVersion)) { + return CommonResponse.buildFailedResponse( + "remoteDataCenter, ignoreDataInfoIds, expectVersion is not allow empty."); + } + + MultiClusterSyncInfo exist = multiClusterSyncRepository.query(remoteDataCenter); + + if (exist == null || exist.getDataVersion() != Long.parseLong(expectVersion)) { + return CommonResponse.buildFailedResponse( + StringFormatter.format( + "remoteDataCenter:{}, expectVersion:{} not exist.", remoteDataCenter, expectVersion)); + } + + exist.getIgnoreDataInfoIds().addAll(Sets.newHashSet(ignoreDataInfoIds.split(","))); + exist.setDataVersion(PersistenceDataBuilder.nextVersion()); + boolean ret = multiClusterSyncRepository.update(exist, NumberUtils.toLong(expectVersion)); + + LOG.info( + "[addIgnoreSyncDataInfoIds]result:{}, remoteDataCenter:{}, ignoreDataInfoIds:{}, expectVersion:{}", + ret, + remoteDataCenter, + ignoreDataInfoIds, + expectVersion); + + CommonResponse response = new CommonResponse(); + response.setSuccess(ret); + return response; + } + + @POST + @Path("/sync/ignoreDataInfoIds/remove") + @Produces(MediaType.APPLICATION_JSON) + public CommonResponse removeIgnoreDataInfoIds( + @FormParam("remoteDataCenter") String remoteDataCenter, + @FormParam("ignoreDataInfoIds") String ignoreDataInfoIds, + @FormParam("token") String token, + @FormParam("expectVersion") String expectVersion) { + if (!AuthChecker.authCheck(token)) { + LOG.error( + "remove ignoreDataInfoIds, remoteDataCenter={}, ignoreDataInfoIds={}, auth check={} fail!", + remoteDataCenter, + ignoreDataInfoIds, + token); + return GenericResponse.buildFailedResponse("auth check fail"); + } + if (StringUtils.isBlank(remoteDataCenter) + || StringUtils.isBlank(ignoreDataInfoIds) + || StringUtils.isBlank(expectVersion)) { + return CommonResponse.buildFailedResponse( + "remoteDataCenter, ignoreDataInfoIds, expectVersion is not allow empty."); + } + + MultiClusterSyncInfo exist = multiClusterSyncRepository.query(remoteDataCenter); + + if (exist == null || exist.getDataVersion() != Long.parseLong(expectVersion)) { + return CommonResponse.buildFailedResponse( + StringFormatter.format( + "remoteDataCenter:{}, expectVersion:{} not exist.", remoteDataCenter, expectVersion)); + } + + exist.getIgnoreDataInfoIds().removeAll(Sets.newHashSet(ignoreDataInfoIds.split(","))); + exist.setDataVersion(PersistenceDataBuilder.nextVersion()); + boolean ret = multiClusterSyncRepository.update(exist, NumberUtils.toLong(expectVersion)); + + LOG.info( + "[removeIgnoreDataInfoIds]result:{}, remoteDataCenter:{}, ignoreDataInfoIds:{}, expectVersion:{}", + ret, + remoteDataCenter, + ignoreDataInfoIds, + expectVersion); + + CommonResponse response = new CommonResponse(); + response.setSuccess(ret); + return response; + } } From 28637f7923b34591c036184d6d2c278c8445c668 Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Thu, 4 Sep 2025 11:33:47 +0800 Subject: [PATCH 05/10] The initial value of the batching configuration should be set to the minimum value for adaptive batching --- .../session/push/PushEfficiencyConfigUpdater.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java index 3101f7634..1200bbb5a 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java @@ -18,14 +18,15 @@ import com.alipay.sofa.registry.server.session.resource.ClientManagerResource; import com.google.common.annotations.VisibleForTesting; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * @author huicha * @date 2025/7/24 @@ -90,6 +91,15 @@ public void updateFromProviderData(PushEfficiencyImproveConfig pushEfficiencyImp this.autoPushEfficiencyRegulator = new AutoPushEfficiencyRegulator(autoPushEfficiencyConfig, this); + + // 这里需要调整下初始配置的值 + if (autoPushEfficiencyConfig.isEnableDebouncingTime()) { + pushEfficiencyImproveConfig.setChangeDebouncingMillis(autoPushEfficiencyConfig.getDebouncingTimeMin()); + } + + if (autoPushEfficiencyConfig.isEnableMaxDebouncingTime()) { + pushEfficiencyImproveConfig.setChangeDebouncingMaxMillis(autoPushEfficiencyConfig.getMaxDebouncingTimeMin()); + } } else { // 新的配置中,关闭了自动化配置,此时如果还存在正在运行的 AutoPushEfficiencyRegulator 则需要关掉 this.useAutoPushEfficiency = false; From ac2665df5becdf71b2d95e379dd390e024da01fc Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Mon, 8 Sep 2025 18:14:25 +0800 Subject: [PATCH 06/10] Added an API endpoint to query the current batching configuration. --- .../bootstrap/SessionServerConfiguration.java | 103 ++++-------------- .../session/push/ChangeDebouncingTime.java | 37 +++++++ .../server/session/push/ChangeProcessor.java | 34 +++++- .../push/PushEfficiencyConfigUpdater.java | 11 +- .../PushEfficiencyConfigResource.java | 48 ++++++++ 5 files changed, 141 insertions(+), 92 deletions(-) create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java index 04180cd10..134eadb35 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java @@ -45,30 +45,9 @@ import com.alipay.sofa.registry.server.session.metadata.MetadataCacheRegistry; import com.alipay.sofa.registry.server.session.multi.cluster.DataCenterMetadataCache; import com.alipay.sofa.registry.server.session.multi.cluster.DataCenterMetadataCacheImpl; -import com.alipay.sofa.registry.server.session.node.service.ClientNodeService; -import com.alipay.sofa.registry.server.session.node.service.ClientNodeServiceImpl; -import com.alipay.sofa.registry.server.session.node.service.DataNodeService; -import com.alipay.sofa.registry.server.session.node.service.DataNodeServiceImpl; -import com.alipay.sofa.registry.server.session.node.service.MetaServerServiceImpl; -import com.alipay.sofa.registry.server.session.node.service.SessionMetaServerManager; -import com.alipay.sofa.registry.server.session.providedata.AppRevisionWriteSwitchService; -import com.alipay.sofa.registry.server.session.providedata.CompressPushService; -import com.alipay.sofa.registry.server.session.providedata.ConfigProvideDataWatcher; -import com.alipay.sofa.registry.server.session.providedata.FetchBlackListService; -import com.alipay.sofa.registry.server.session.providedata.FetchCircuitBreakerService; -import com.alipay.sofa.registry.server.session.providedata.FetchClientOffAddressService; -import com.alipay.sofa.registry.server.session.providedata.FetchDataInfoIDBlackListService; -import com.alipay.sofa.registry.server.session.providedata.FetchGrayPushSwitchService; -import com.alipay.sofa.registry.server.session.providedata.FetchPushEfficiencyConfigService; -import com.alipay.sofa.registry.server.session.providedata.FetchShutdownService; -import com.alipay.sofa.registry.server.session.providedata.FetchStopPushService; -import com.alipay.sofa.registry.server.session.providedata.ProvideDataProcessorManager; -import com.alipay.sofa.registry.server.session.push.ChangeProcessor; -import com.alipay.sofa.registry.server.session.push.FirePushService; -import com.alipay.sofa.registry.server.session.push.PushDataGenerator; -import com.alipay.sofa.registry.server.session.push.PushProcessor; -import com.alipay.sofa.registry.server.session.push.PushSwitchService; -import com.alipay.sofa.registry.server.session.push.WatchProcessor; +import com.alipay.sofa.registry.server.session.node.service.*; +import com.alipay.sofa.registry.server.session.providedata.*; +import com.alipay.sofa.registry.server.session.push.*; import com.alipay.sofa.registry.server.session.registry.Registry; import com.alipay.sofa.registry.server.session.registry.RegistryScanCallable; import com.alipay.sofa.registry.server.session.registry.SessionRegistry; @@ -77,65 +56,17 @@ import com.alipay.sofa.registry.server.session.remoting.DataNodeNotifyExchanger; import com.alipay.sofa.registry.server.session.remoting.console.SessionConsoleExchanger; import com.alipay.sofa.registry.server.session.remoting.console.handler.*; -import com.alipay.sofa.registry.server.session.remoting.handler.AppRevisionSliceHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.ClientNodeConnectionHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.DataChangeRequestHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.DataPushRequestHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.DataSlotDiffDigestRequestHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.DataSlotDiffPublisherRequestHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.GetRevisionPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.MetaRevisionHeartbeatPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.MetadataRegisterPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.NotifyProvideDataChangeHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.PublisherHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.PublisherPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.ServiceAppMappingPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.SubscriberHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.SubscriberPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.SyncConfigHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.SyncConfigPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.WatcherHandler; -import com.alipay.sofa.registry.server.session.resource.ClientManagerResource; -import com.alipay.sofa.registry.server.session.resource.ClientsOpenResource; -import com.alipay.sofa.registry.server.session.resource.CompressResource; -import com.alipay.sofa.registry.server.session.resource.ConnectionsResource; -import com.alipay.sofa.registry.server.session.resource.EmergencyApiResource; -import com.alipay.sofa.registry.server.session.resource.HealthResource; -import com.alipay.sofa.registry.server.session.resource.MetadataCacheResource; -import com.alipay.sofa.registry.server.session.resource.PersistenceClientManagerResource; -import com.alipay.sofa.registry.server.session.resource.SessionDigestResource; -import com.alipay.sofa.registry.server.session.resource.SessionOpenResource; -import com.alipay.sofa.registry.server.session.resource.SlotTableStatusResource; +import com.alipay.sofa.registry.server.session.remoting.handler.*; +import com.alipay.sofa.registry.server.session.resource.*; import com.alipay.sofa.registry.server.session.scheduler.timertask.CacheCountTask; import com.alipay.sofa.registry.server.session.scheduler.timertask.SessionCacheDigestTask; import com.alipay.sofa.registry.server.session.scheduler.timertask.SyncClientsHeartbeatTask; import com.alipay.sofa.registry.server.session.slot.SlotTableCache; import com.alipay.sofa.registry.server.session.slot.SlotTableCacheImpl; -import com.alipay.sofa.registry.server.session.store.DataStore; -import com.alipay.sofa.registry.server.session.store.FetchPubSubDataInfoIdService; -import com.alipay.sofa.registry.server.session.store.Interests; -import com.alipay.sofa.registry.server.session.store.SessionDataStore; -import com.alipay.sofa.registry.server.session.store.SessionInterests; -import com.alipay.sofa.registry.server.session.store.SessionWatchers; -import com.alipay.sofa.registry.server.session.store.Watchers; -import com.alipay.sofa.registry.server.session.strategy.AppRevisionHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.PublisherHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.SessionRegistryStrategy; -import com.alipay.sofa.registry.server.session.strategy.SubscriberHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.SyncConfigHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.WatcherHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultAppRevisionHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultPublisherHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultSessionRegistryStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultSubscriberHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultSyncConfigHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultWatcherHandlerStrategy; -import com.alipay.sofa.registry.server.session.wrapper.AccessLimitWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.BlacklistWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.ClientCheckWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.ClientOffWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.DataInfoIDBlacklistWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.WrapperInterceptorManager; +import com.alipay.sofa.registry.server.session.store.*; +import com.alipay.sofa.registry.server.session.strategy.*; +import com.alipay.sofa.registry.server.session.strategy.impl.*; +import com.alipay.sofa.registry.server.session.wrapper.*; import com.alipay.sofa.registry.server.shared.client.manager.BaseClientManagerService; import com.alipay.sofa.registry.server.shared.client.manager.ClientManagerService; import com.alipay.sofa.registry.server.shared.config.CommonConfig; @@ -156,11 +87,6 @@ import com.alipay.sofa.registry.task.MetricsableThreadPoolExecutor; import com.alipay.sofa.registry.util.NamedThreadFactory; import com.alipay.sofa.registry.util.PropertySplitter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.glassfish.jersey.jackson.JacksonFeature; import org.glassfish.jersey.server.ResourceConfig; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -169,6 +95,12 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + /** * @author shangyu.wh * @version $Id: SessionServerConfiguration.java, v 0.1 2017-11-14 11:39 synex Exp $ @@ -566,6 +498,11 @@ public MetadataCacheResource metadataCacheResource() { public EmergencyApiResource emergencyApiResource() { return new EmergencyApiResource(); } + + @Bean + public PushEfficiencyConfigResource pushEfficiencyConfigResource() { + return new PushEfficiencyConfigResource(); + } } @Configuration diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java new file mode 100644 index 000000000..15317d7d6 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java @@ -0,0 +1,37 @@ +package com.alipay.sofa.registry.server.session.push; + +/** + * @author huicha + * @date 2025/9/8 + */ +public class ChangeDebouncingTime { + + private int changeDebouncingMillis; + + private int changeDebouncingMaxMillis; + + public ChangeDebouncingTime() { + } + + public ChangeDebouncingTime(int changeDebouncingMillis, int changeDebouncingMaxMillis) { + this.changeDebouncingMillis = changeDebouncingMillis; + this.changeDebouncingMaxMillis = changeDebouncingMaxMillis; + } + + public int getChangeDebouncingMillis() { + return changeDebouncingMillis; + } + + public void setChangeDebouncingMillis(int changeDebouncingMillis) { + this.changeDebouncingMillis = changeDebouncingMillis; + } + + public int getChangeDebouncingMaxMillis() { + return changeDebouncingMaxMillis; + } + + public void setChangeDebouncingMaxMillis(int changeDebouncingMaxMillis) { + this.changeDebouncingMaxMillis = changeDebouncingMaxMillis; + } + +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java index 57e3b9cd2..7a33dd09c 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java @@ -23,10 +23,11 @@ import com.alipay.sofa.registry.util.StringFormatter; import com.alipay.sofa.registry.util.WakeUpLoopRunnable; import com.google.common.collect.Maps; -import java.util.*; -import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; +import javax.annotation.PostConstruct; +import java.util.*; + public class ChangeProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ChangeProcessor.class); @@ -64,6 +65,26 @@ public void setWorkDelayTime(PushEfficiencyImproveConfig pushEfficiencyImproveCo } } + public Map getChangeDebouncingMillis() { + Map dcChangeDebouncingTimes = new HashMap<>(dataCenterWorkers.size()); + for (Map.Entry entry : dataCenterWorkers.entrySet()) { + String dataCenter = entry.getKey(); + Worker[] workers = entry.getValue(); + if (workers == null) { + dcChangeDebouncingTimes.put(dataCenter, new ChangeDebouncingTime[0]); + continue; + } + ChangeDebouncingTime[] changeDebouncingTimes = new ChangeDebouncingTime[workers.length]; + for (int index = 0; index < workers.length; index++) { + Worker worker = workers[index]; + ChangeDebouncingTime changeDebouncingTime = worker.getChangeDebouncingTime(); + changeDebouncingTimes[index] = changeDebouncingTime; + } + dcChangeDebouncingTimes.put(dataCenter, changeDebouncingTimes); + } + return dcChangeDebouncingTimes; + } + public void setChangeDebouncingMillis(int changeDebouncingMillis, int changeDebouncingMaxMillis) { for (Map.Entry entry : dataCenterWorkers.entrySet()) { Worker[] workers = entry.getValue(); @@ -135,8 +156,8 @@ public void setChangeDebouncingMillis( this.changeDebouncingMaxMillis = changeDebouncingMaxMillis; } - int changeDebouncingMillis; - int changeDebouncingMaxMillis; + volatile int changeDebouncingMillis; + volatile int changeDebouncingMaxMillis; int changeTaskWaitingMillis = 100; Worker(int changeDebouncingMillis, int changeDebouncingMaxMillis) { @@ -219,6 +240,11 @@ public void runUnthrowable() { public int getWaitingMillis() { return changeTaskWaitingMillis; } + + public ChangeDebouncingTime getChangeDebouncingTime() { + return new ChangeDebouncingTime(this.changeDebouncingMillis, this.changeDebouncingMaxMillis); + } + } static final class ChangeKey { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java index 1200bbb5a..33209239f 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java @@ -18,15 +18,14 @@ import com.alipay.sofa.registry.server.session.resource.ClientManagerResource; import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - /** * @author huicha * @date 2025/7/24 @@ -94,11 +93,13 @@ public void updateFromProviderData(PushEfficiencyImproveConfig pushEfficiencyImp // 这里需要调整下初始配置的值 if (autoPushEfficiencyConfig.isEnableDebouncingTime()) { - pushEfficiencyImproveConfig.setChangeDebouncingMillis(autoPushEfficiencyConfig.getDebouncingTimeMin()); + pushEfficiencyImproveConfig.setChangeDebouncingMillis( + autoPushEfficiencyConfig.getDebouncingTimeMin()); } if (autoPushEfficiencyConfig.isEnableMaxDebouncingTime()) { - pushEfficiencyImproveConfig.setChangeDebouncingMaxMillis(autoPushEfficiencyConfig.getMaxDebouncingTimeMin()); + pushEfficiencyImproveConfig.setChangeDebouncingMaxMillis( + autoPushEfficiencyConfig.getMaxDebouncingTimeMin()); } } else { // 新的配置中,关闭了自动化配置,此时如果还存在正在运行的 AutoPushEfficiencyRegulator 则需要关掉 diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java new file mode 100644 index 000000000..ad5a6e3dd --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java @@ -0,0 +1,48 @@ +package com.alipay.sofa.registry.server.session.resource; + +import com.alipay.sofa.registry.common.model.GenericResponse; +import com.alipay.sofa.registry.server.session.push.ChangeDebouncingTime; +import com.alipay.sofa.registry.server.session.push.ChangeProcessor; +import com.alipay.sofa.registry.server.shared.resource.AuthChecker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import java.util.Map; + +/** + * @author huicha + * @date 2025/9/8 + */ +@Path("api/push/efficiency") +public class PushEfficiencyConfigResource { + + private Logger LOGGER = LoggerFactory.getLogger(PushEfficiencyConfigResource.class); + + @Autowired + private ChangeProcessor changeProcessor; + + @GET + @Path("/getChangeDebouncingMillis") + @Produces(MediaType.APPLICATION_JSON) + public GenericResponse> getChangeDebouncingMillis(@HeaderParam("token") String token) { + try { + if (!AuthChecker.authCheck(token)) { + LOGGER.error("[module=PushEfficiencyConfigResource][method=getChangeDebouncingMillis] auth check={} fail!", token); + return new GenericResponse().fillFailed("auth check fail"); + } + + Map changeDebouncingTimes = this.changeProcessor.getChangeDebouncingMillis(); + return new GenericResponse().fillSucceed(changeDebouncingTimes); + } catch (Throwable throwable) { + LOGGER.error("[module=PushEfficiencyConfigResource][method=getChangeDebouncingMillis] getChangeDebouncingMillis exception", throwable); + return new GenericResponse().fillFailed("getChangeDebouncingMillis exception"); + } + } + +} From 3bc7b149cda825549965087fbe7559374717a302 Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Tue, 9 Sep 2025 19:27:56 +0800 Subject: [PATCH 07/10] Added an API endpoint to query the current batching configuration. --- .../bootstrap/SessionServerConfiguration.java | 11 +++-- .../session/push/ChangeDebouncingTime.java | 20 +++++++-- .../server/session/push/ChangeProcessor.java | 9 ++-- .../PushEfficiencyConfigResource.java | 43 +++++++++++++------ 4 files changed, 57 insertions(+), 26 deletions(-) diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java index 134eadb35..151b8bf23 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java @@ -87,6 +87,11 @@ import com.alipay.sofa.registry.task.MetricsableThreadPoolExecutor; import com.alipay.sofa.registry.util.NamedThreadFactory; import com.alipay.sofa.registry.util.PropertySplitter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.glassfish.jersey.jackson.JacksonFeature; import org.glassfish.jersey.server.ResourceConfig; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -95,12 +100,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - /** * @author shangyu.wh * @version $Id: SessionServerConfiguration.java, v 0.1 2017-11-14 11:39 synex Exp $ diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java index 15317d7d6..9e2a6990e 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alipay.sofa.registry.server.session.push; /** @@ -10,8 +26,7 @@ public class ChangeDebouncingTime { private int changeDebouncingMaxMillis; - public ChangeDebouncingTime() { - } + public ChangeDebouncingTime() {} public ChangeDebouncingTime(int changeDebouncingMillis, int changeDebouncingMaxMillis) { this.changeDebouncingMillis = changeDebouncingMillis; @@ -33,5 +48,4 @@ public int getChangeDebouncingMaxMillis() { public void setChangeDebouncingMaxMillis(int changeDebouncingMaxMillis) { this.changeDebouncingMaxMillis = changeDebouncingMaxMillis; } - } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java index 7a33dd09c..7a6186715 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java @@ -23,10 +23,9 @@ import com.alipay.sofa.registry.util.StringFormatter; import com.alipay.sofa.registry.util.WakeUpLoopRunnable; import com.google.common.collect.Maps; -import org.springframework.beans.factory.annotation.Autowired; - -import javax.annotation.PostConstruct; import java.util.*; +import javax.annotation.PostConstruct; +import org.springframework.beans.factory.annotation.Autowired; public class ChangeProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ChangeProcessor.class); @@ -66,7 +65,8 @@ public void setWorkDelayTime(PushEfficiencyImproveConfig pushEfficiencyImproveCo } public Map getChangeDebouncingMillis() { - Map dcChangeDebouncingTimes = new HashMap<>(dataCenterWorkers.size()); + Map dcChangeDebouncingTimes = + new HashMap<>(dataCenterWorkers.size()); for (Map.Entry entry : dataCenterWorkers.entrySet()) { String dataCenter = entry.getKey(); Worker[] workers = entry.getValue(); @@ -244,7 +244,6 @@ public int getWaitingMillis() { public ChangeDebouncingTime getChangeDebouncingTime() { return new ChangeDebouncingTime(this.changeDebouncingMillis, this.changeDebouncingMaxMillis); } - } static final class ChangeKey { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java index ad5a6e3dd..93bb494f4 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java @@ -1,19 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alipay.sofa.registry.server.session.resource; import com.alipay.sofa.registry.common.model.GenericResponse; import com.alipay.sofa.registry.server.session.push.ChangeDebouncingTime; import com.alipay.sofa.registry.server.session.push.ChangeProcessor; import com.alipay.sofa.registry.server.shared.resource.AuthChecker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; - +import java.util.Map; import javax.ws.rs.GET; import javax.ws.rs.HeaderParam; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; /** * @author huicha @@ -24,25 +39,29 @@ public class PushEfficiencyConfigResource { private Logger LOGGER = LoggerFactory.getLogger(PushEfficiencyConfigResource.class); - @Autowired - private ChangeProcessor changeProcessor; + @Autowired private ChangeProcessor changeProcessor; @GET @Path("/getChangeDebouncingMillis") @Produces(MediaType.APPLICATION_JSON) - public GenericResponse> getChangeDebouncingMillis(@HeaderParam("token") String token) { + public GenericResponse> getChangeDebouncingMillis( + @HeaderParam("token") String token) { try { if (!AuthChecker.authCheck(token)) { - LOGGER.error("[module=PushEfficiencyConfigResource][method=getChangeDebouncingMillis] auth check={} fail!", token); + LOGGER.error( + "[module=PushEfficiencyConfigResource][method=getChangeDebouncingMillis] auth check={} fail!", + token); return new GenericResponse().fillFailed("auth check fail"); } - Map changeDebouncingTimes = this.changeProcessor.getChangeDebouncingMillis(); + Map changeDebouncingTimes = + this.changeProcessor.getChangeDebouncingMillis(); return new GenericResponse().fillSucceed(changeDebouncingTimes); } catch (Throwable throwable) { - LOGGER.error("[module=PushEfficiencyConfigResource][method=getChangeDebouncingMillis] getChangeDebouncingMillis exception", throwable); + LOGGER.error( + "[module=PushEfficiencyConfigResource][method=getChangeDebouncingMillis] getChangeDebouncingMillis exception", + throwable); return new GenericResponse().fillFailed("getChangeDebouncingMillis exception"); } } - } From 1203a59da6e00a94d11e2d8d8e545dcbe23888ed Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Mon, 22 Sep 2025 15:38:06 +0800 Subject: [PATCH 08/10] bugfix: Adjust the initial and default values of the automated batching configuration --- .../bootstrap/MetaServerConfiguration.java | 5 ++ .../meta/resource/DataCenterResource.java | 57 +++++++++++++++ .../push/AutoPushEfficiencyRegulator.java | 24 +++++- .../push/PushEfficiencyConfigUpdater.java | 8 +- .../push/AutoPushEfficiencyRegulatorTest.java | 73 ++++++++++++++++++- 5 files changed, 158 insertions(+), 9 deletions(-) create mode 100644 server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java index a8cf59645..0571e2ca4 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java @@ -414,6 +414,11 @@ public MetricsResource metricsResource() { public RegistryCoreOpsResource registryCoreOpsResource() { return new RegistryCoreOpsResource(); } + + @Bean + public DataCenterResource dataCenterResource() { + return new DataCenterResource(); + } } @Configuration diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java new file mode 100644 index 000000000..d792146d3 --- /dev/null +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.meta.resource; + +import com.alipay.sofa.registry.core.model.Result; +import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig; +import com.alipay.sofa.registry.server.meta.resource.filter.LeaderAwareRestController; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author huicha + * @date 2025/9/22 + */ +@Path("datacenter") +@LeaderAwareRestController +public class DataCenterResource { + + private Logger LOGGER = LoggerFactory.getLogger(DataCenterResource.class); + + @Autowired private MetaServerConfig metaServerConfig; + + @GET + @Path("query") + @Produces(MediaType.APPLICATION_JSON) + public Result queryBlackList() { + try { + String localDataCenter = this.metaServerConfig.getLocalDataCenter(); + Result result = Result.success(); + result.setMessage(localDataCenter); + return result; + } catch (Throwable throwable) { + LOGGER.error("Query meta local datacenter exception", throwable); + return Result.failed("Query meta local datacenter exception"); + } + } + +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java index c07de7595..379a30ade 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java @@ -83,8 +83,12 @@ public class AutoPushEfficiencyRegulator extends LoopRunnable { private final BooleanMetric trafficOperateLimitSwitch; public AutoPushEfficiencyRegulator( - AutoPushEfficiencyConfig autoPushEfficiencyConfig, + PushEfficiencyImproveConfig pushEfficiencyImproveConfig, PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater) { + // 获取自适应攒批配置 + AutoPushEfficiencyConfig autoPushEfficiencyConfig = + pushEfficiencyImproveConfig.getAutoPushEfficiencyConfig(); + // 初始化窗口相关配置 this.windowTime = autoPushEfficiencyConfig.getWindowTimeMillis(); this.windowNum = autoPushEfficiencyConfig.getWindowNum(); @@ -106,12 +110,14 @@ public AutoPushEfficiencyRegulator( this.debouncingTime = new IntMetric( autoPushEfficiencyConfig.isEnableDebouncingTime(), + pushEfficiencyImproveConfig.getChangeDebouncingMillis(), autoPushEfficiencyConfig.getDebouncingTimeMax(), autoPushEfficiencyConfig.getDebouncingTimeMin(), autoPushEfficiencyConfig.getDebouncingTimeStep()); this.maxDebouncingTime = new IntMetric( autoPushEfficiencyConfig.isEnableMaxDebouncingTime(), + pushEfficiencyImproveConfig.getChangeDebouncingMaxMillis(), autoPushEfficiencyConfig.getMaxDebouncingTimeMax(), autoPushEfficiencyConfig.getMaxDebouncingTimeMin(), autoPushEfficiencyConfig.getMaxDebouncingTimeStep()); @@ -311,6 +317,9 @@ class IntMetric { private final boolean enable; + // 当不启用自适应攒批时,指标的默认值 + private final int defaultV; + // 指标的最大值 private final int max; @@ -323,8 +332,9 @@ class IntMetric { // 当前指标的值 private int current; - public IntMetric(boolean enable, int max, int min, int step) { + public IntMetric(boolean enable, int defaultV, int max, int min, int step) { this.enable = enable; + this.defaultV = defaultV; this.max = max; this.min = min; this.step = step; @@ -369,7 +379,15 @@ public boolean isEnable() { } public int load() { - return this.current; + if (this.enable) { + return this.current; + } else { + return this.defaultV; + } + } + + public int loadDefaultV() { + return this.defaultV; } } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java index 33209239f..40379e5cc 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java @@ -88,19 +88,21 @@ public void updateFromProviderData(PushEfficiencyImproveConfig pushEfficiencyImp this.autoPushEfficiencyRegulator.close(); } - this.autoPushEfficiencyRegulator = - new AutoPushEfficiencyRegulator(autoPushEfficiencyConfig, this); - // 这里需要调整下初始配置的值 if (autoPushEfficiencyConfig.isEnableDebouncingTime()) { + // 当自适应攒批需要调整 debouncing time 的时候,需要将 debouncing time 的初始值设置为 min pushEfficiencyImproveConfig.setChangeDebouncingMillis( autoPushEfficiencyConfig.getDebouncingTimeMin()); } if (autoPushEfficiencyConfig.isEnableMaxDebouncingTime()) { + // 当自适应攒批需要调整 max debouncing time 的时候,需要将 debouncing time 的初始值设置为 min pushEfficiencyImproveConfig.setChangeDebouncingMaxMillis( autoPushEfficiencyConfig.getMaxDebouncingTimeMin()); } + + this.autoPushEfficiencyRegulator = + new AutoPushEfficiencyRegulator(pushEfficiencyImproveConfig, this); } else { // 新的配置中,关闭了自动化配置,此时如果还存在正在运行的 AutoPushEfficiencyRegulator 则需要关掉 this.useAutoPushEfficiency = false; diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java index 437434fa3..62cb09b6a 100644 --- a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.registry.server.session.push; +import java.util.Collections; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -29,9 +30,9 @@ public class AutoPushEfficiencyRegulatorTest { @Test - public void test() throws InterruptedException { - AutoPushEfficiencyConfig autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); + public void testUpdateDebouncingTimeAndMaxDebouncingTime() throws InterruptedException { // 开启自动化配置 + AutoPushEfficiencyConfig autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); autoPushEfficiencyConfig.setEnableAutoPushEfficiency(true); autoPushEfficiencyConfig.setEnableDebouncingTime(true); autoPushEfficiencyConfig.setEnableMaxDebouncingTime(true); @@ -42,13 +43,16 @@ public void test() throws InterruptedException { autoPushEfficiencyConfig.setWindowNum(10); autoPushEfficiencyConfig.setWindowTimeMillis(100); + PushEfficiencyImproveConfig pushEfficiencyImproveConfig = new PushEfficiencyImproveConfig(); + pushEfficiencyImproveConfig.setAutoPushEfficiencyConfig(autoPushEfficiencyConfig); + MockPushEfficiencyConfigUpdater mockPushEfficiencyConfigUpdater = new MockPushEfficiencyConfigUpdater(); PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater = Mockito.mock(PushEfficiencyConfigUpdater.class, mockPushEfficiencyConfigUpdater); AutoPushEfficiencyRegulator autoPushEfficiencyRegulator = - new AutoPushEfficiencyRegulator(autoPushEfficiencyConfig, pushEfficiencyConfigUpdater); + new AutoPushEfficiencyRegulator(pushEfficiencyImproveConfig, pushEfficiencyConfigUpdater); try { for (int loop = 0; loop < 40; loop++) { @@ -77,6 +81,69 @@ public void test() throws InterruptedException { autoPushEfficiencyRegulator.close(); } } + + @Test + public void testOnlyUpdateDebouncingTime() throws InterruptedException { + // 开启自动化配置 + AutoPushEfficiencyConfig autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); + autoPushEfficiencyConfig.setEnableAutoPushEfficiency(true); + autoPushEfficiencyConfig.setEnableDebouncingTime(true); + + // 不自适应调整 max debouncing time,但是这里设置单独的初始配置 + // 预期这个初始配置不生效 + autoPushEfficiencyConfig.setEnableMaxDebouncingTime(false); + autoPushEfficiencyConfig.setMaxDebouncingTimeMin(10); + autoPushEfficiencyConfig.setMaxDebouncingTimeMax(100); + autoPushEfficiencyConfig.setMaxDebouncingTimeStep(10); + + // 总推送数超过 10 笔就触发增加攒批时长 + autoPushEfficiencyConfig.setPushCountThreshold(10); + + autoPushEfficiencyConfig.setWindowNum(10); + autoPushEfficiencyConfig.setWindowTimeMillis(100); + + PushEfficiencyImproveConfig pushEfficiencyImproveConfig = new PushEfficiencyImproveConfig(); + pushEfficiencyImproveConfig.setZoneSet(Collections.singleton("ALL_ZONE")); + pushEfficiencyImproveConfig.setAutoPushEfficiencyConfig(autoPushEfficiencyConfig); + + // 设置一个特别的 max debouncing time,预期后面自适应调整攒批配置的时候,max debouncing time 始终为 15000 + pushEfficiencyImproveConfig.setChangeDebouncingMaxMillis(15000); + + MockPushEfficiencyConfigUpdater mockPushEfficiencyConfigUpdater = + new MockPushEfficiencyConfigUpdater(); + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater = + Mockito.mock(PushEfficiencyConfigUpdater.class, mockPushEfficiencyConfigUpdater); + + AutoPushEfficiencyRegulator autoPushEfficiencyRegulator = + new AutoPushEfficiencyRegulator(pushEfficiencyImproveConfig, pushEfficiencyConfigUpdater); + + try { + for (int loop = 0; loop < 40; loop++) { + for (int i = 0; i < 5; i++) { + autoPushEfficiencyRegulator.safeIncrementPushCount(); + } + Thread.sleep(50); + } + + int debouncingTime = mockPushEfficiencyConfigUpdater.getDebouncingTime(); + int maxDebouncingTime = mockPushEfficiencyConfigUpdater.getMaxDebouncingTime(); + Assert.assertEquals( + debouncingTime, 1000 /*AutoPushEfficiencyConfig.DEFAULT_DEBOUNCING_TIME_MAX*/); + Assert.assertEquals( + maxDebouncingTime, 15000 /*AutoPushEfficiencyConfig.DEFAULT_MAX_DEBOUNCING_TIME_MAX*/); + + Thread.sleep(2000); + + debouncingTime = mockPushEfficiencyConfigUpdater.getDebouncingTime(); + maxDebouncingTime = mockPushEfficiencyConfigUpdater.getMaxDebouncingTime(); + Assert.assertEquals( + debouncingTime, 100 /*AutoPushEfficiencyConfig.DEFAULT_DEBOUNCING_TIME_MIN*/); + Assert.assertEquals( + maxDebouncingTime, 15000 /*AutoPushEfficiencyConfig.DEFAULT_MAX_DEBOUNCING_TIME_MIN*/); + } finally { + autoPushEfficiencyRegulator.close(); + } + } } class MockPushEfficiencyConfigUpdater implements Answer { From 693e7988f833a370ab3e6c2893d9dbb01356e2c0 Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Mon, 22 Sep 2025 15:40:24 +0800 Subject: [PATCH 09/10] release v6.6.1-auto-regulator --- client/all/pom.xml | 2 +- client/api/pom.xml | 2 +- client/impl/pom.xml | 2 +- client/log/pom.xml | 2 +- client/pom.xml | 2 +- core/pom.xml | 2 +- pom.xml | 2 +- server/common/model/pom.xml | 2 +- server/common/pom.xml | 2 +- server/common/util/pom.xml | 2 +- server/distribution/all/pom.xml | 2 +- server/distribution/pom.xml | 2 +- server/pom.xml | 2 +- server/remoting/api/pom.xml | 2 +- server/remoting/bolt/pom.xml | 2 +- server/remoting/http/pom.xml | 2 +- server/remoting/pom.xml | 2 +- server/server/data/pom.xml | 2 +- server/server/integration/pom.xml | 2 +- server/server/meta/pom.xml | 2 +- server/server/pom.xml | 2 +- server/server/session/pom.xml | 2 +- server/server/shared/pom.xml | 2 +- server/store/api/pom.xml | 2 +- server/store/jdbc/pom.xml | 2 +- server/store/jraft/pom.xml | 2 +- server/store/pom.xml | 2 +- test/pom.xml | 2 +- 28 files changed, 28 insertions(+), 28 deletions(-) diff --git a/client/all/pom.xml b/client/all/pom.xml index d9a781ea0..3ee2a5bbb 100644 --- a/client/all/pom.xml +++ b/client/all/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-client-all - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ${project.groupId}:${project.artifactId} http://github.com/alipay/sofa-registry diff --git a/client/api/pom.xml b/client/api/pom.xml index 666a9a229..aa2336782 100644 --- a/client/api/pom.xml +++ b/client/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/client/impl/pom.xml b/client/impl/pom.xml index c8af6dc12..1c081fcba 100644 --- a/client/impl/pom.xml +++ b/client/impl/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/client/log/pom.xml b/client/log/pom.xml index b7ee11855..3c6c3394a 100644 --- a/client/log/pom.xml +++ b/client/log/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/client/pom.xml b/client/pom.xml index 2c19399bd..eba3f3d26 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -7,7 +7,7 @@ com.alipay.sofa registry-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 49f827f4f..4975bdf03 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index 3117501f8..78e7b3df3 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.alipay.sofa registry-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator pom diff --git a/server/common/model/pom.xml b/server/common/model/pom.xml index 18fcf50d5..bebfe6552 100644 --- a/server/common/model/pom.xml +++ b/server/common/model/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-common - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/common/pom.xml b/server/common/pom.xml index ebc6d64ae..167da730b 100644 --- a/server/common/pom.xml +++ b/server/common/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/common/util/pom.xml b/server/common/util/pom.xml index db5b9882e..9e9f00475 100644 --- a/server/common/util/pom.xml +++ b/server/common/util/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-common - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/distribution/all/pom.xml b/server/distribution/all/pom.xml index 972fef99b..819270431 100644 --- a/server/distribution/all/pom.xml +++ b/server/distribution/all/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-distribution - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/distribution/pom.xml b/server/distribution/pom.xml index c116804fa..7c0274ac5 100644 --- a/server/distribution/pom.xml +++ b/server/distribution/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-server-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/pom.xml b/server/pom.xml index 553cea954..14c28f247 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -7,7 +7,7 @@ com.alipay.sofa registry-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml diff --git a/server/remoting/api/pom.xml b/server/remoting/api/pom.xml index ace1dae3a..fe1498ab8 100644 --- a/server/remoting/api/pom.xml +++ b/server/remoting/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/remoting/bolt/pom.xml b/server/remoting/bolt/pom.xml index bcaca1deb..05e98d64b 100644 --- a/server/remoting/bolt/pom.xml +++ b/server/remoting/bolt/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/remoting/http/pom.xml b/server/remoting/http/pom.xml index 558d34b3b..bfd22b130 100644 --- a/server/remoting/http/pom.xml +++ b/server/remoting/http/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/remoting/pom.xml b/server/remoting/pom.xml index a98af9f0c..b2b328518 100644 --- a/server/remoting/pom.xml +++ b/server/remoting/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/data/pom.xml b/server/server/data/pom.xml index f85abe93e..33b052ab9 100644 --- a/server/server/data/pom.xml +++ b/server/server/data/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/integration/pom.xml b/server/server/integration/pom.xml index 881e6c629..4ea452cf0 100644 --- a/server/server/integration/pom.xml +++ b/server/server/integration/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/meta/pom.xml b/server/server/meta/pom.xml index 7a06154af..c8b876933 100644 --- a/server/server/meta/pom.xml +++ b/server/server/meta/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/pom.xml b/server/server/pom.xml index b396d367e..a2c832c5d 100644 --- a/server/server/pom.xml +++ b/server/server/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/session/pom.xml b/server/server/session/pom.xml index d199a3450..cc2c3cb53 100644 --- a/server/server/session/pom.xml +++ b/server/server/session/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/shared/pom.xml b/server/server/shared/pom.xml index 5bf85e5de..9f3f01f3e 100644 --- a/server/server/shared/pom.xml +++ b/server/server/shared/pom.xml @@ -5,7 +5,7 @@ registry-server com.alipay.sofa - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator 4.0.0 diff --git a/server/store/api/pom.xml b/server/store/api/pom.xml index 708c72970..7de0d1b84 100644 --- a/server/store/api/pom.xml +++ b/server/store/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-store - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/store/jdbc/pom.xml b/server/store/jdbc/pom.xml index a6b2fef81..9b41cbe6c 100644 --- a/server/store/jdbc/pom.xml +++ b/server/store/jdbc/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-store - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml diff --git a/server/store/jraft/pom.xml b/server/store/jraft/pom.xml index 83ebe9164..3dca5d97d 100644 --- a/server/store/jraft/pom.xml +++ b/server/store/jraft/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-store - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml diff --git a/server/store/pom.xml b/server/store/pom.xml index e3850c84b..63723a38e 100644 --- a/server/store/pom.xml +++ b/server/store/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/test/pom.xml b/test/pom.xml index 91069b453..42cd0aaa9 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-parent - 6.6.1-auto-regulator-SNAPSHOT + 6.6.1-auto-regulator ../pom.xml 4.0.0 From 5a73faa96f0e6e03cae3977398e25ef9f5d0e8e5 Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Mon, 22 Sep 2025 15:57:05 +0800 Subject: [PATCH 10/10] fix unit test --- .../session/push/PushEfficiencyConfigUpdater.java | 10 ++++++++-- .../session/push/PushEfficiencyConfigUpdaterTest.java | 7 +++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java index 40379e5cc..1fa7f4842 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java @@ -18,14 +18,15 @@ import com.alipay.sofa.registry.server.session.resource.ClientManagerResource; import com.google.common.annotations.VisibleForTesting; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * @author huicha * @date 2025/7/24 @@ -210,4 +211,9 @@ public void setPushProcessor(PushProcessor pushProcessor) { public void setFirePushService(FirePushService firePushService) { this.firePushService = firePushService; } + + @VisibleForTesting + public void setClientManagerResource(ClientManagerResource clientManagerResource) { + this.clientManagerResource = clientManagerResource; + } } diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java index 14829c9b4..dd8d80ee1 100644 --- a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.registry.server.session.push; +import com.alipay.sofa.registry.server.session.resource.ClientManagerResource; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -31,11 +32,13 @@ public void testUpdateFromProviderData() { ChangeProcessor changeProcessor = Mockito.mock(ChangeProcessor.class); PushProcessor pushProcessor = Mockito.mock(PushProcessor.class); FirePushService firePushService = Mockito.mock(FirePushService.class); + ClientManagerResource clientManagerResource = Mockito.mock(ClientManagerResource.class); PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater = new PushEfficiencyConfigUpdater(); pushEfficiencyConfigUpdater.setChangeProcessor(changeProcessor); pushEfficiencyConfigUpdater.setPushProcessor(pushProcessor); pushEfficiencyConfigUpdater.setFirePushService(firePushService); + pushEfficiencyConfigUpdater.setClientManagerResource(clientManagerResource); // 更新没有开启自动化配置,因此预期是 null pushEfficiencyConfigUpdater.updateFromProviderData(new PushEfficiencyImproveConfig()); @@ -58,8 +61,6 @@ public void testUpdateFromProviderData() { autoPushEfficiencyRegulator = pushEfficiencyConfigUpdater.getAutoPushEfficiencyRegulator(); Assert.assertNotNull(autoPushEfficiencyRegulator); - Long autoPushEfficiencyRegulatorId = autoPushEfficiencyRegulator.getId(); - Assert.assertEquals(1L, (long) autoPushEfficiencyRegulatorId); // 第三次仍然开启,但是我们修改一部分配置 autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); @@ -79,12 +80,10 @@ public void testUpdateFromProviderData() { pushEfficiencyConfigUpdater.getAutoPushEfficiencyRegulator(); Assert.assertNotNull(newAutoPushEfficiencyRegulator); - Long newAutoPushEfficiencyRegulatorId = newAutoPushEfficiencyRegulator.getId(); int windowNum = newAutoPushEfficiencyRegulator.getWindowNum(); int windowSize = newAutoPushEfficiencyRegulator.getWindowsSize(); long pushCountThreshold = newAutoPushEfficiencyRegulator.getPushCountThreshold(); - Assert.assertEquals(2L, (long) newAutoPushEfficiencyRegulatorId); Assert.assertEquals(3, windowNum); Assert.assertEquals(3, windowSize); Assert.assertEquals(10, pushCountThreshold);