diff --git a/client/all/pom.xml b/client/all/pom.xml index 659ef9630..3ee2a5bbb 100644 --- a/client/all/pom.xml +++ b/client/all/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-client-all - 6.6.0 + 6.6.1-auto-regulator ${project.groupId}:${project.artifactId} http://github.com/alipay/sofa-registry diff --git a/client/api/pom.xml b/client/api/pom.xml index 813ed1a56..aa2336782 100644 --- a/client/api/pom.xml +++ b/client/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/client/impl/pom.xml b/client/impl/pom.xml index eeb52e65b..1c081fcba 100644 --- a/client/impl/pom.xml +++ b/client/impl/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/client/log/pom.xml b/client/log/pom.xml index 5a604f2b9..3c6c3394a 100644 --- a/client/log/pom.xml +++ b/client/log/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-client-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/client/pom.xml b/client/pom.xml index ac8f03fa7..eba3f3d26 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -7,7 +7,7 @@ com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 65c0a4404..4975bdf03 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index 8993c7615..78e7b3df3 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator pom diff --git a/server/common/model/pom.xml b/server/common/model/pom.xml index e2bdfa81f..bebfe6552 100644 --- a/server/common/model/pom.xml +++ b/server/common/model/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-common - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/common/pom.xml b/server/common/pom.xml index 8006db3b7..167da730b 100644 --- a/server/common/pom.xml +++ b/server/common/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/common/util/pom.xml b/server/common/util/pom.xml index b5dad36f5..9e9f00475 100644 --- a/server/common/util/pom.xml +++ b/server/common/util/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-common - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/distribution/all/pom.xml b/server/distribution/all/pom.xml index 71314cf16..819270431 100644 --- a/server/distribution/all/pom.xml +++ b/server/distribution/all/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-distribution - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/distribution/pom.xml b/server/distribution/pom.xml index e8088f885..7c0274ac5 100644 --- a/server/distribution/pom.xml +++ b/server/distribution/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/pom.xml b/server/pom.xml index dcb3dfae7..14c28f247 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -7,7 +7,7 @@ com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml diff --git a/server/remoting/api/pom.xml b/server/remoting/api/pom.xml index 038267cc5..fe1498ab8 100644 --- a/server/remoting/api/pom.xml +++ b/server/remoting/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/remoting/bolt/pom.xml b/server/remoting/bolt/pom.xml index 48e58c09f..05e98d64b 100644 --- a/server/remoting/bolt/pom.xml +++ b/server/remoting/bolt/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/remoting/http/pom.xml b/server/remoting/http/pom.xml index 7ccbcac31..bfd22b130 100644 --- a/server/remoting/http/pom.xml +++ b/server/remoting/http/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-remoting - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/remoting/pom.xml b/server/remoting/pom.xml index f5c17333f..b2b328518 100644 --- a/server/remoting/pom.xml +++ b/server/remoting/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/data/pom.xml b/server/server/data/pom.xml index af9af0654..33b052ab9 100644 --- a/server/server/data/pom.xml +++ b/server/server/data/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/integration/pom.xml b/server/server/integration/pom.xml index 854ec5ac8..4ea452cf0 100644 --- a/server/server/integration/pom.xml +++ b/server/server/integration/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/meta/pom.xml b/server/server/meta/pom.xml index d4dc470dd..c8b876933 100644 --- a/server/server/meta/pom.xml +++ b/server/server/meta/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java index a08a66f5a..70fb27f71 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java @@ -42,14 +42,13 @@ import com.github.rholder.retry.WaitStrategies; import com.google.common.base.Predicate; import java.lang.annotation.Annotation; -import java.util.Collection; -import java.util.Date; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Resource; import javax.ws.rs.Path; import javax.ws.rs.ext.Provider; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.glassfish.jersey.server.ResourceConfig; import org.springframework.beans.factory.annotation.Autowired; @@ -236,12 +235,22 @@ private void renewNode() { private void openSessionRegisterServer() { try { if (rpcServerForSessionStarted.compareAndSet(false, true)) { + List mergedSessionServerHandlers = + new ArrayList<>(this.sessionServerHandlers); + + Collection customSessionServerHandlers = + this.customSessionServerHandlers(); + if (CollectionUtils.isNotEmpty(customSessionServerHandlers)) { + mergedSessionServerHandlers.addAll(this.customSessionServerHandlers()); + } + sessionServer = boltExchange.open( new URL( NetUtil.getLocalAddress().getHostAddress(), metaServerConfig.getSessionServerPort()), - sessionServerHandlers.toArray(new ChannelHandler[sessionServerHandlers.size()])); + mergedSessionServerHandlers.toArray( + new ChannelHandler[mergedSessionServerHandlers.size()])); LOGGER.info( "Open session node register server port {} success!", @@ -257,6 +266,10 @@ private void openSessionRegisterServer() { } } + protected Collection customSessionServerHandlers() { + return Collections.emptyList(); + } + private void openDataRegisterServer() { try { if (rpcServerForDataStarted.compareAndSet(false, true)) { diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java index a8cf59645..0571e2ca4 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java @@ -414,6 +414,11 @@ public MetricsResource metricsResource() { public RegistryCoreOpsResource registryCoreOpsResource() { return new RegistryCoreOpsResource(); } + + @Bean + public DataCenterResource dataCenterResource() { + return new DataCenterResource(); + } } @Configuration diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java new file mode 100644 index 000000000..d792146d3 --- /dev/null +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.meta.resource; + +import com.alipay.sofa.registry.core.model.Result; +import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig; +import com.alipay.sofa.registry.server.meta.resource.filter.LeaderAwareRestController; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author huicha + * @date 2025/9/22 + */ +@Path("datacenter") +@LeaderAwareRestController +public class DataCenterResource { + + private Logger LOGGER = LoggerFactory.getLogger(DataCenterResource.class); + + @Autowired private MetaServerConfig metaServerConfig; + + @GET + @Path("query") + @Produces(MediaType.APPLICATION_JSON) + public Result queryBlackList() { + try { + String localDataCenter = this.metaServerConfig.getLocalDataCenter(); + Result result = Result.success(); + result.setMessage(localDataCenter); + return result; + } catch (Throwable throwable) { + LOGGER.error("Query meta local datacenter exception", throwable); + return Result.failed("Query meta local datacenter exception"); + } + } + +} diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java index 1583b995b..0fe33da97 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java @@ -27,12 +27,8 @@ import com.alipay.sofa.registry.util.StringFormatter; import com.google.common.collect.Sets; import java.util.Locale; -import javax.ws.rs.FormParam; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; +import java.util.Set; +import javax.ws.rs.*; import javax.ws.rs.core.MediaType; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.math.NumberUtils; @@ -66,6 +62,16 @@ public GenericResponse query( return response; } + @GET + @Path("queryAll") + @Produces(MediaType.APPLICATION_JSON) + public GenericResponse> queryAll() { + GenericResponse> response = new GenericResponse(); + Set queryResult = multiClusterSyncRepository.queryLocalSyncInfos(); + response.fillSucceed(queryResult); + return response; + } + @POST @Path("/save") @Produces(MediaType.APPLICATION_JSON) @@ -517,4 +523,99 @@ public CommonResponse removeConfig( response.setSuccess(ret > 0); return response; } + + @POST + @Path("/sync/ignoreDataInfoIds/add") + @Produces(MediaType.APPLICATION_JSON) + public CommonResponse addIgnoreSyncDataInfoIds( + @FormParam("remoteDataCenter") String remoteDataCenter, + @FormParam("ignoreDataInfoIds") String ignoreDataInfoIds, + @FormParam("token") String token, + @FormParam("expectVersion") String expectVersion) { + if (!AuthChecker.authCheck(token)) { + LOG.error( + "add ignoreDataInfoIds, remoteDataCenter={}, ignoreDataInfoIds={}, auth check={} fail!", + remoteDataCenter, + ignoreDataInfoIds, + token); + return GenericResponse.buildFailedResponse("auth check fail"); + } + + if (StringUtils.isBlank(remoteDataCenter) + || StringUtils.isBlank(ignoreDataInfoIds) + || StringUtils.isBlank(expectVersion)) { + return CommonResponse.buildFailedResponse( + "remoteDataCenter, ignoreDataInfoIds, expectVersion is not allow empty."); + } + + MultiClusterSyncInfo exist = multiClusterSyncRepository.query(remoteDataCenter); + + if (exist == null || exist.getDataVersion() != Long.parseLong(expectVersion)) { + return CommonResponse.buildFailedResponse( + StringFormatter.format( + "remoteDataCenter:{}, expectVersion:{} not exist.", remoteDataCenter, expectVersion)); + } + + exist.getIgnoreDataInfoIds().addAll(Sets.newHashSet(ignoreDataInfoIds.split(","))); + exist.setDataVersion(PersistenceDataBuilder.nextVersion()); + boolean ret = multiClusterSyncRepository.update(exist, NumberUtils.toLong(expectVersion)); + + LOG.info( + "[addIgnoreSyncDataInfoIds]result:{}, remoteDataCenter:{}, ignoreDataInfoIds:{}, expectVersion:{}", + ret, + remoteDataCenter, + ignoreDataInfoIds, + expectVersion); + + CommonResponse response = new CommonResponse(); + response.setSuccess(ret); + return response; + } + + @POST + @Path("/sync/ignoreDataInfoIds/remove") + @Produces(MediaType.APPLICATION_JSON) + public CommonResponse removeIgnoreDataInfoIds( + @FormParam("remoteDataCenter") String remoteDataCenter, + @FormParam("ignoreDataInfoIds") String ignoreDataInfoIds, + @FormParam("token") String token, + @FormParam("expectVersion") String expectVersion) { + if (!AuthChecker.authCheck(token)) { + LOG.error( + "remove ignoreDataInfoIds, remoteDataCenter={}, ignoreDataInfoIds={}, auth check={} fail!", + remoteDataCenter, + ignoreDataInfoIds, + token); + return GenericResponse.buildFailedResponse("auth check fail"); + } + if (StringUtils.isBlank(remoteDataCenter) + || StringUtils.isBlank(ignoreDataInfoIds) + || StringUtils.isBlank(expectVersion)) { + return CommonResponse.buildFailedResponse( + "remoteDataCenter, ignoreDataInfoIds, expectVersion is not allow empty."); + } + + MultiClusterSyncInfo exist = multiClusterSyncRepository.query(remoteDataCenter); + + if (exist == null || exist.getDataVersion() != Long.parseLong(expectVersion)) { + return CommonResponse.buildFailedResponse( + StringFormatter.format( + "remoteDataCenter:{}, expectVersion:{} not exist.", remoteDataCenter, expectVersion)); + } + + exist.getIgnoreDataInfoIds().removeAll(Sets.newHashSet(ignoreDataInfoIds.split(","))); + exist.setDataVersion(PersistenceDataBuilder.nextVersion()); + boolean ret = multiClusterSyncRepository.update(exist, NumberUtils.toLong(expectVersion)); + + LOG.info( + "[removeIgnoreDataInfoIds]result:{}, remoteDataCenter:{}, ignoreDataInfoIds:{}, expectVersion:{}", + ret, + remoteDataCenter, + ignoreDataInfoIds, + expectVersion); + + CommonResponse response = new CommonResponse(); + response.setSuccess(ret); + return response; + } } diff --git a/server/server/pom.xml b/server/server/pom.xml index ba00016bf..a2c832c5d 100644 --- a/server/server/pom.xml +++ b/server/server/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/session/pom.xml b/server/server/session/pom.xml index e3fc57229..cc2c3cb53 100644 --- a/server/server/session/pom.xml +++ b/server/server/session/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java index 04180cd10..151b8bf23 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java @@ -45,30 +45,9 @@ import com.alipay.sofa.registry.server.session.metadata.MetadataCacheRegistry; import com.alipay.sofa.registry.server.session.multi.cluster.DataCenterMetadataCache; import com.alipay.sofa.registry.server.session.multi.cluster.DataCenterMetadataCacheImpl; -import com.alipay.sofa.registry.server.session.node.service.ClientNodeService; -import com.alipay.sofa.registry.server.session.node.service.ClientNodeServiceImpl; -import com.alipay.sofa.registry.server.session.node.service.DataNodeService; -import com.alipay.sofa.registry.server.session.node.service.DataNodeServiceImpl; -import com.alipay.sofa.registry.server.session.node.service.MetaServerServiceImpl; -import com.alipay.sofa.registry.server.session.node.service.SessionMetaServerManager; -import com.alipay.sofa.registry.server.session.providedata.AppRevisionWriteSwitchService; -import com.alipay.sofa.registry.server.session.providedata.CompressPushService; -import com.alipay.sofa.registry.server.session.providedata.ConfigProvideDataWatcher; -import com.alipay.sofa.registry.server.session.providedata.FetchBlackListService; -import com.alipay.sofa.registry.server.session.providedata.FetchCircuitBreakerService; -import com.alipay.sofa.registry.server.session.providedata.FetchClientOffAddressService; -import com.alipay.sofa.registry.server.session.providedata.FetchDataInfoIDBlackListService; -import com.alipay.sofa.registry.server.session.providedata.FetchGrayPushSwitchService; -import com.alipay.sofa.registry.server.session.providedata.FetchPushEfficiencyConfigService; -import com.alipay.sofa.registry.server.session.providedata.FetchShutdownService; -import com.alipay.sofa.registry.server.session.providedata.FetchStopPushService; -import com.alipay.sofa.registry.server.session.providedata.ProvideDataProcessorManager; -import com.alipay.sofa.registry.server.session.push.ChangeProcessor; -import com.alipay.sofa.registry.server.session.push.FirePushService; -import com.alipay.sofa.registry.server.session.push.PushDataGenerator; -import com.alipay.sofa.registry.server.session.push.PushProcessor; -import com.alipay.sofa.registry.server.session.push.PushSwitchService; -import com.alipay.sofa.registry.server.session.push.WatchProcessor; +import com.alipay.sofa.registry.server.session.node.service.*; +import com.alipay.sofa.registry.server.session.providedata.*; +import com.alipay.sofa.registry.server.session.push.*; import com.alipay.sofa.registry.server.session.registry.Registry; import com.alipay.sofa.registry.server.session.registry.RegistryScanCallable; import com.alipay.sofa.registry.server.session.registry.SessionRegistry; @@ -77,65 +56,17 @@ import com.alipay.sofa.registry.server.session.remoting.DataNodeNotifyExchanger; import com.alipay.sofa.registry.server.session.remoting.console.SessionConsoleExchanger; import com.alipay.sofa.registry.server.session.remoting.console.handler.*; -import com.alipay.sofa.registry.server.session.remoting.handler.AppRevisionSliceHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.ClientNodeConnectionHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.DataChangeRequestHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.DataPushRequestHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.DataSlotDiffDigestRequestHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.DataSlotDiffPublisherRequestHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.GetRevisionPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.MetaRevisionHeartbeatPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.MetadataRegisterPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.NotifyProvideDataChangeHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.PublisherHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.PublisherPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.ServiceAppMappingPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.SubscriberHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.SubscriberPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.SyncConfigHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.SyncConfigPbHandler; -import com.alipay.sofa.registry.server.session.remoting.handler.WatcherHandler; -import com.alipay.sofa.registry.server.session.resource.ClientManagerResource; -import com.alipay.sofa.registry.server.session.resource.ClientsOpenResource; -import com.alipay.sofa.registry.server.session.resource.CompressResource; -import com.alipay.sofa.registry.server.session.resource.ConnectionsResource; -import com.alipay.sofa.registry.server.session.resource.EmergencyApiResource; -import com.alipay.sofa.registry.server.session.resource.HealthResource; -import com.alipay.sofa.registry.server.session.resource.MetadataCacheResource; -import com.alipay.sofa.registry.server.session.resource.PersistenceClientManagerResource; -import com.alipay.sofa.registry.server.session.resource.SessionDigestResource; -import com.alipay.sofa.registry.server.session.resource.SessionOpenResource; -import com.alipay.sofa.registry.server.session.resource.SlotTableStatusResource; +import com.alipay.sofa.registry.server.session.remoting.handler.*; +import com.alipay.sofa.registry.server.session.resource.*; import com.alipay.sofa.registry.server.session.scheduler.timertask.CacheCountTask; import com.alipay.sofa.registry.server.session.scheduler.timertask.SessionCacheDigestTask; import com.alipay.sofa.registry.server.session.scheduler.timertask.SyncClientsHeartbeatTask; import com.alipay.sofa.registry.server.session.slot.SlotTableCache; import com.alipay.sofa.registry.server.session.slot.SlotTableCacheImpl; -import com.alipay.sofa.registry.server.session.store.DataStore; -import com.alipay.sofa.registry.server.session.store.FetchPubSubDataInfoIdService; -import com.alipay.sofa.registry.server.session.store.Interests; -import com.alipay.sofa.registry.server.session.store.SessionDataStore; -import com.alipay.sofa.registry.server.session.store.SessionInterests; -import com.alipay.sofa.registry.server.session.store.SessionWatchers; -import com.alipay.sofa.registry.server.session.store.Watchers; -import com.alipay.sofa.registry.server.session.strategy.AppRevisionHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.PublisherHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.SessionRegistryStrategy; -import com.alipay.sofa.registry.server.session.strategy.SubscriberHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.SyncConfigHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.WatcherHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultAppRevisionHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultPublisherHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultSessionRegistryStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultSubscriberHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultSyncConfigHandlerStrategy; -import com.alipay.sofa.registry.server.session.strategy.impl.DefaultWatcherHandlerStrategy; -import com.alipay.sofa.registry.server.session.wrapper.AccessLimitWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.BlacklistWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.ClientCheckWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.ClientOffWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.DataInfoIDBlacklistWrapperInterceptor; -import com.alipay.sofa.registry.server.session.wrapper.WrapperInterceptorManager; +import com.alipay.sofa.registry.server.session.store.*; +import com.alipay.sofa.registry.server.session.strategy.*; +import com.alipay.sofa.registry.server.session.strategy.impl.*; +import com.alipay.sofa.registry.server.session.wrapper.*; import com.alipay.sofa.registry.server.shared.client.manager.BaseClientManagerService; import com.alipay.sofa.registry.server.shared.client.manager.ClientManagerService; import com.alipay.sofa.registry.server.shared.config.CommonConfig; @@ -566,6 +497,11 @@ public MetadataCacheResource metadataCacheResource() { public EmergencyApiResource emergencyApiResource() { return new EmergencyApiResource(); } + + @Bean + public PushEfficiencyConfigResource pushEfficiencyConfigResource() { + return new PushEfficiencyConfigResource(); + } } @Configuration diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java index 0f4542453..4ac87f951 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java @@ -21,10 +21,8 @@ import com.alipay.sofa.registry.log.Logger; import com.alipay.sofa.registry.log.LoggerFactory; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; -import com.alipay.sofa.registry.server.session.push.ChangeProcessor; -import com.alipay.sofa.registry.server.session.push.FirePushService; +import com.alipay.sofa.registry.server.session.push.PushEfficiencyConfigUpdater; import com.alipay.sofa.registry.server.session.push.PushEfficiencyImproveConfig; -import com.alipay.sofa.registry.server.session.push.PushProcessor; import com.alipay.sofa.registry.server.shared.providedata.AbstractFetchSystemPropertyService; import com.alipay.sofa.registry.server.shared.providedata.SystemDataStorage; import com.alipay.sofa.registry.util.JsonUtils; @@ -43,10 +41,7 @@ public class FetchPushEfficiencyConfigService LoggerFactory.getLogger(FetchPushEfficiencyConfigService.class); @Autowired private SessionServerConfig sessionServerConfig; - @Autowired private ChangeProcessor changeProcessor; - @Autowired private PushProcessor pushProcessor; - - @Autowired private FirePushService firePushService; + @Autowired private PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater; public FetchPushEfficiencyConfigService() { super( @@ -87,11 +82,9 @@ protected boolean doProcess(SwitchStorage expect, ProvideData data) { if (!compareAndSet(expect, update)) { return false; } - changeProcessor.setWorkDelayTime(pushEfficiencyImproveConfig); - pushProcessor.setPushTaskDelayTime(pushEfficiencyImproveConfig); - if (firePushService.getRegProcessor() != null) { - firePushService.getRegProcessor().setWorkDelayTime(pushEfficiencyImproveConfig); - } + + this.pushEfficiencyConfigUpdater.updateFromProviderData(pushEfficiencyImproveConfig); + LOGGER.info( "Fetch PushEfficiencyImproveConfig success, prev={}, current={}", expect.pushEfficiencyImproveConfig, @@ -100,27 +93,16 @@ protected boolean doProcess(SwitchStorage expect, ProvideData data) { } @VisibleForTesting - public FetchPushEfficiencyConfigService setChangeProcessor(ChangeProcessor changeProcessor) { - this.changeProcessor = changeProcessor; - return this; - } - - @VisibleForTesting - public FetchPushEfficiencyConfigService setPushProcessor(PushProcessor pushProcessor) { - this.pushProcessor = pushProcessor; - return this; - } - - @VisibleForTesting - public FetchPushEfficiencyConfigService setFirePushService(FirePushService firePushService) { - this.firePushService = firePushService; + public FetchPushEfficiencyConfigService setSessionServerConfig( + SessionServerConfig sessionServerConfig) { + this.sessionServerConfig = sessionServerConfig; return this; } @VisibleForTesting - public FetchPushEfficiencyConfigService setSessionServerConfig( - SessionServerConfig sessionServerConfig) { - this.sessionServerConfig = sessionServerConfig; + public FetchPushEfficiencyConfigService setPushEfficiencyConfigUpdater( + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater) { + this.pushEfficiencyConfigUpdater = pushEfficiencyConfigUpdater; return this; } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java new file mode 100644 index 000000000..376c86419 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.builder.ToStringBuilder; + +/** + * @author huicha + * @date 2025/7/24 + */ +public class AutoPushEfficiencyConfig { + + private static final int DEFAULT_WINDOW_NUM = 6; + + private static final long DEFAULT_WINDOW_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10L); + + private static final long DEFAULT_PUSH_COUNT_THRESHOLD = 170000L; + + private static final int DEFAULT_DEBOUNCING_TIME_MAX = 1000; + + private static final int DEFAULT_DEBOUNCING_TIME_MIN = 100; + + private static final int DEFAULT_DEBOUNCING_TIME_STEP = 100; + + private static final int DEFAULT_MAX_DEBOUNCING_TIME_MAX = 3000; + + private static final int DEFAULT_MAX_DEBOUNCING_TIME_MIN = 1000; + + private static final int DEFAULT_MAX_DEBOUNCING_TIME_STEP = 200; + + private static final double DEFAULT_LOAD_THRESHOLD = 6; + + private boolean enableAutoPushEfficiency = false; + + private int windowNum = DEFAULT_WINDOW_NUM; + + private long windowTimeMillis = DEFAULT_WINDOW_TIME_MILLIS; + + private long pushCountThreshold = DEFAULT_PUSH_COUNT_THRESHOLD; + + // == 攒批时长参数 == + // 启动攒批时长的自动化调整 + private boolean enableDebouncingTime = false; + + // 攒批时长的最大值 + private int debouncingTimeMax = DEFAULT_DEBOUNCING_TIME_MAX; + + // 攒批时长的最小值 + private int debouncingTimeMin = DEFAULT_DEBOUNCING_TIME_MIN; + + // 调整攒批时长的步长 + private int debouncingTimeStep = DEFAULT_DEBOUNCING_TIME_STEP; + + // 启动最大攒批时长的自动化调整 + // 可以看下下面这个方法,最大攒批时长,和攒批时长是两个不同的指标 + // @see com.alipay.sofa.registry.server.session.push.ChangeProcessor.Worker.setChangeTaskWorkDelay + private boolean enableMaxDebouncingTime = false; + + // 最大攒批时长的最大值 + private int maxDebouncingTimeMax = DEFAULT_MAX_DEBOUNCING_TIME_MAX; + + // 最大攒批时长的最小值 + private int maxDebouncingTimeMin = DEFAULT_MAX_DEBOUNCING_TIME_MIN; + + // 最大调整攒批时长的步长 + private int maxDebouncingTimeStep = DEFAULT_MAX_DEBOUNCING_TIME_STEP; + // == 攒批时长参数 == + + // == 开关流限流参数 == + private boolean enableTrafficOperateLimitSwitch = false; + + private double loadThreshold = DEFAULT_LOAD_THRESHOLD; + // == 开关流限流参数 == + + public boolean isEnableAutoPushEfficiency() { + return enableAutoPushEfficiency; + } + + public void setEnableAutoPushEfficiency(boolean enableAutoPushEfficiency) { + this.enableAutoPushEfficiency = enableAutoPushEfficiency; + } + + public int getWindowNum() { + return windowNum; + } + + public void setWindowNum(int windowNum) { + this.windowNum = windowNum; + } + + public long getWindowTimeMillis() { + return windowTimeMillis; + } + + public void setWindowTimeMillis(long windowTimeMillis) { + this.windowTimeMillis = windowTimeMillis; + } + + public long getPushCountThreshold() { + return pushCountThreshold; + } + + public void setPushCountThreshold(long pushCountThreshold) { + this.pushCountThreshold = pushCountThreshold; + } + + public boolean isEnableDebouncingTime() { + return enableDebouncingTime; + } + + public void setEnableDebouncingTime(boolean enableDebouncingTime) { + this.enableDebouncingTime = enableDebouncingTime; + } + + public int getDebouncingTimeMax() { + return debouncingTimeMax; + } + + public void setDebouncingTimeMax(int debouncingTimeMax) { + this.debouncingTimeMax = debouncingTimeMax; + } + + public int getDebouncingTimeMin() { + return debouncingTimeMin; + } + + public void setDebouncingTimeMin(int debouncingTimeMin) { + this.debouncingTimeMin = debouncingTimeMin; + } + + public int getDebouncingTimeStep() { + return debouncingTimeStep; + } + + public void setDebouncingTimeStep(int debouncingTimeStep) { + this.debouncingTimeStep = debouncingTimeStep; + } + + public boolean isEnableMaxDebouncingTime() { + return enableMaxDebouncingTime; + } + + public void setEnableMaxDebouncingTime(boolean enableMaxDebouncingTime) { + this.enableMaxDebouncingTime = enableMaxDebouncingTime; + } + + public int getMaxDebouncingTimeMax() { + return maxDebouncingTimeMax; + } + + public void setMaxDebouncingTimeMax(int maxDebouncingTimeMax) { + this.maxDebouncingTimeMax = maxDebouncingTimeMax; + } + + public int getMaxDebouncingTimeMin() { + return maxDebouncingTimeMin; + } + + public void setMaxDebouncingTimeMin(int maxDebouncingTimeMin) { + this.maxDebouncingTimeMin = maxDebouncingTimeMin; + } + + public int getMaxDebouncingTimeStep() { + return maxDebouncingTimeStep; + } + + public void setMaxDebouncingTimeStep(int maxDebouncingTimeStep) { + this.maxDebouncingTimeStep = maxDebouncingTimeStep; + } + + public boolean isEnableTrafficOperateLimitSwitch() { + return enableTrafficOperateLimitSwitch; + } + + public double getLoadThreshold() { + return loadThreshold; + } + + public void setLoadThreshold(double loadThreshold) { + this.loadThreshold = loadThreshold; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java new file mode 100644 index 000000000..379a30ade --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import com.alipay.sofa.registry.log.Logger; +import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.util.ConcurrentUtils; +import com.alipay.sofa.registry.util.LoopRunnable; +import com.google.common.annotations.VisibleForTesting; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 推送流控配置 + * + * @author huicha + * @date 2025/7/23 + */ +public class AutoPushEfficiencyRegulator extends LoopRunnable { + + private static final Logger LOGGER = LoggerFactory.getLogger("AUTO-PUSH-EFFICIENCY-REGULATOR"); + + private static final AtomicLong ID_GENERATOR = new AtomicLong(0); + + // 窗口的时长 (毫秒) + private final long windowTime; + + // 窗口数量 + private final int windowNum; + + // 窗口; 用于统计每个窗口内的推送次数 + private final AtomicLong[] windows; + + // 当前窗口的索引 + private final AtomicInteger index; + + // 阈值; 推送次数高于这个阈值的时候会开始逐渐调整攒批配置 + private final long pushCountThreshold; + + // 阈值; 用于调整开关流限流开关的 Load 阈值。 + // 不影响推送攒批配置 + private final double loadThreshold; + + // 预热次数,等到所有的窗口都轮换过一遍之后才能开始统计 + // 这里因为 warmupTimes 的值总是单线程读写的,因此没有加 volatile 关键字 + private int warmupTimes; + + // 唯一 ID + private final Long id; + + // 推送效率配置更新器 + private final PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater; + + // 采集系统负载指标 + private final OperatingSystemMXBean operatingSystemMXBean; + + // 攒批时长 + private final IntMetric debouncingTime; + + // 最大攒批时长 + private final IntMetric maxDebouncingTime; + + // 开关流限流 + // 因为命名为 enable traffic operate 在这里可能会有些歧义 + // 可能会有人理解成 load 过高时是否操作开关流限流,因此这里命名的比较长 + private final BooleanMetric trafficOperateLimitSwitch; + + public AutoPushEfficiencyRegulator( + PushEfficiencyImproveConfig pushEfficiencyImproveConfig, + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater) { + // 获取自适应攒批配置 + AutoPushEfficiencyConfig autoPushEfficiencyConfig = + pushEfficiencyImproveConfig.getAutoPushEfficiencyConfig(); + + // 初始化窗口相关配置 + this.windowTime = autoPushEfficiencyConfig.getWindowTimeMillis(); + this.windowNum = autoPushEfficiencyConfig.getWindowNum(); + this.windows = new AtomicLong[windowNum]; + for (int i = 0; i < windowNum; i++) { + this.windows[i] = new AtomicLong(0); + } + this.index = new AtomicInteger(0); + + // 设置其他参数 + this.id = ID_GENERATOR.incrementAndGet(); + this.pushCountThreshold = autoPushEfficiencyConfig.getPushCountThreshold(); + this.loadThreshold = autoPushEfficiencyConfig.getLoadThreshold(); + this.warmupTimes = 0; + this.pushEfficiencyConfigUpdater = pushEfficiencyConfigUpdater; + this.operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); + + // 初始化可能需要调整的指标 + this.debouncingTime = + new IntMetric( + autoPushEfficiencyConfig.isEnableDebouncingTime(), + pushEfficiencyImproveConfig.getChangeDebouncingMillis(), + autoPushEfficiencyConfig.getDebouncingTimeMax(), + autoPushEfficiencyConfig.getDebouncingTimeMin(), + autoPushEfficiencyConfig.getDebouncingTimeStep()); + this.maxDebouncingTime = + new IntMetric( + autoPushEfficiencyConfig.isEnableMaxDebouncingTime(), + pushEfficiencyImproveConfig.getChangeDebouncingMaxMillis(), + autoPushEfficiencyConfig.getMaxDebouncingTimeMax(), + autoPushEfficiencyConfig.getMaxDebouncingTimeMin(), + autoPushEfficiencyConfig.getMaxDebouncingTimeStep()); + this.trafficOperateLimitSwitch = + new BooleanMetric(autoPushEfficiencyConfig.isEnableTrafficOperateLimitSwitch()); + + // 启动定时任务 + ConcurrentUtils.createDaemonThread("AutoPushEfficiencyRegulator-" + this.id, this).start(); + } + + public void safeIncrementPushCount() { + try { + if (this.isClosed()) { + return; + } + int currentIndex = this.index.get(); + this.windows[currentIndex].incrementAndGet(); + } catch (Throwable throwable) { + LOGGER.error( + "[module=AutoPushEfficiencyRegulator][method=safeIncrementPushCount] increment push count exception", + throwable); + } + } + + /** 滚动窗口 */ + private void rollWindow() { + // 1. 获取当前窗口的索引 + int currentIndex = this.index.get(); + + // 2. 计算出下一个窗口的索引 + int newIndex = currentIndex + 1; + if (newIndex >= this.windowNum) { + newIndex = 0; + } + + // 3. 先清空下一个窗口的统计值,然后再更新索引 + this.windows[newIndex].set(0); + this.index.set(newIndex); + } + + private long computeTotalPushCount() { + long totalPushCount = 0; + for (int forIndex = 0; forIndex < this.windows.length; forIndex++) { + totalPushCount += this.windows[forIndex].get(); + } + return totalPushCount; + } + + private boolean checkPushCountIsHigh() { + long totalPushCount = this.computeTotalPushCount(); + return totalPushCount > this.pushCountThreshold; + } + + private void updateDebouncingTime(String tag) { + int debouncingTime = this.debouncingTime.load(); + int maxDebouncingTime = this.maxDebouncingTime.load(); + LOGGER.info( + "[ID: {}][{}] debouncingTime: {} maxDebouncingTime: {}", + this.id, + tag, + debouncingTime, + maxDebouncingTime); + this.pushEfficiencyConfigUpdater.updateDebouncingTime(debouncingTime, maxDebouncingTime); + } + + private void updateTrafficOperateLimitSwitch() { + boolean trafficOperateLimitSwitch = this.trafficOperateLimitSwitch.load(); + LOGGER.info("[ID: {}] trafficOperateLimitSwitch: {}", this.id, trafficOperateLimitSwitch); + this.pushEfficiencyConfigUpdater.updateTrafficOperateLimitSwitch(trafficOperateLimitSwitch); + } + + public Long getId() { + return id; + } + + @Override + public void runUnthrowable() { + if (this.isClosed()) { + // 如果任务已经被停止了,那么这个进程需要退出 + return; + } + + // 1. 检查是否所有的窗口都轮换过了,即是否完成了预热 + if (this.warmupTimes < this.windowNum) { + // 如果还没有,那么直接滚动窗口,不需要去检查推送频率 + this.rollWindow(); + this.warmupTimes++; + return; + } + + // 2. 已经完成预热了,检查推送频率是否过高 + boolean pushCountIsHigh = this.checkPushCountIsHigh(); + + // 3. 根据推送频率调整推送配置 + this.tryUpdatePushConfig(pushCountIsHigh); + + // 4. 根据推送频率以及负载情况调整开关流限流配置 + this.tryUpdateTrafficOperateLimitSwitch(pushCountIsHigh); + + // 3. 滚动窗口 + // 这里放到最后滚动窗口是因为: + // 滚动窗口时,会把最新的窗口计数清零,如果先滚动后检查推送频率, + // 那么感知到的推送频率就会偏小一点 + this.rollWindow(); + } + + private void tryUpdateTrafficOperateLimitSwitch(boolean pushCountIsHigh) { + if (!this.trafficOperateLimitSwitch.isEnable()) { + // 如果没有开启支持操作开关流,那么就不执行后续的代码了,尽量尝试避免获取系统负载 + return; + } + + // 这里获取到的是过去一分钟的负载平均值,这个值有可能小于 0,小于 0 时表示无法获取平均负载 + // 另外,这个方法的注释上写了这个方法设计上就会考虑可能较频繁调用,因此这里先不考虑做限制了 + double loadAverage = this.operatingSystemMXBean.getSystemLoadAverage(); + if (loadAverage < 0) { + return; + } + + boolean loadIsHigh = loadAverage > loadThreshold; + + if (pushCountIsHigh && loadIsHigh) { + if (this.trafficOperateLimitSwitch.tryTurnOn()) { + this.updateTrafficOperateLimitSwitch(); + } + } else { + if (this.trafficOperateLimitSwitch.tryTurnOff()) { + this.updateTrafficOperateLimitSwitch(); + } + } + } + + private void tryUpdatePushConfig(boolean pushCountIsHigh) { + if (pushCountIsHigh) { + // 推送频率过高,尝试更新攒批时长 + if (this.tryIncrementPushConfig()) { + this.updateDebouncingTime("Increment"); + } + } else { + // 推送频率正常,此时尝试逐渐降低攒批时长 + if (this.tryDecrementPushConfig()) { + this.updateDebouncingTime("Decrement"); + } + } + } + + private boolean tryIncrementPushConfig() { + boolean dataChange = false; + + if (debouncingTime.tryIncrement()) { + dataChange = true; + } + + if (maxDebouncingTime.tryIncrement()) { + dataChange = true; + } + + return dataChange; + } + + private boolean tryDecrementPushConfig() { + boolean dataChange = false; + + if (debouncingTime.tryDecrement()) { + dataChange = true; + } + + if (maxDebouncingTime.tryDecrement()) { + dataChange = true; + } + + return dataChange; + } + + @Override + public void waitingUnthrowable() { + ConcurrentUtils.sleepUninterruptibly(this.windowTime, TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + public int getWindowNum() { + return this.windowNum; + } + + @VisibleForTesting + public int getWindowsSize() { + return this.windows.length; + } + + @VisibleForTesting + public long getPushCountThreshold() { + return this.pushCountThreshold; + } +} + +class IntMetric { + + private final boolean enable; + + // 当不启用自适应攒批时,指标的默认值 + private final int defaultV; + + // 指标的最大值 + private final int max; + + // 指标的最小值 + private final int min; + + // 指标的步长 + private final int step; + + // 当前指标的值 + private int current; + + public IntMetric(boolean enable, int defaultV, int max, int min, int step) { + this.enable = enable; + this.defaultV = defaultV; + this.max = max; + this.min = min; + this.step = step; + this.current = min; + } + + public boolean tryIncrement() { + if (!this.enable) { + return false; + } + + if (this.current < this.max) { + int newValue = this.current + this.step; + if (newValue > this.max) { + newValue = this.max; + } + this.current = newValue; + return true; + } else { + return false; + } + } + + public boolean tryDecrement() { + if (!this.enable) { + return false; + } + if (this.current > this.min) { + int newValue = this.current - this.step; + if (newValue < this.min) { + newValue = this.min; + } + this.current = newValue; + return true; + } else { + return false; + } + } + + public boolean isEnable() { + return this.enable; + } + + public int load() { + if (this.enable) { + return this.current; + } else { + return this.defaultV; + } + } + + public int loadDefaultV() { + return this.defaultV; + } +} + +class BooleanMetric { + + private final boolean enable; + + private boolean current; + + public BooleanMetric(boolean enable) { + this.enable = enable; + this.current = false; + } + + public boolean tryTurnOn() { + if (!this.enable) { + return false; + } + if (this.current) { + return false; + } else { + this.current = true; + return true; + } + } + + public boolean tryTurnOff() { + if (!this.enable) { + return false; + } + if (this.current) { + this.current = false; + return true; + } else { + return false; + } + } + + public boolean isEnable() { + return this.enable; + } + + public boolean load() { + return this.current; + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java new file mode 100644 index 000000000..9e2a6990e --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +/** + * @author huicha + * @date 2025/9/8 + */ +public class ChangeDebouncingTime { + + private int changeDebouncingMillis; + + private int changeDebouncingMaxMillis; + + public ChangeDebouncingTime() {} + + public ChangeDebouncingTime(int changeDebouncingMillis, int changeDebouncingMaxMillis) { + this.changeDebouncingMillis = changeDebouncingMillis; + this.changeDebouncingMaxMillis = changeDebouncingMaxMillis; + } + + public int getChangeDebouncingMillis() { + return changeDebouncingMillis; + } + + public void setChangeDebouncingMillis(int changeDebouncingMillis) { + this.changeDebouncingMillis = changeDebouncingMillis; + } + + public int getChangeDebouncingMaxMillis() { + return changeDebouncingMaxMillis; + } + + public void setChangeDebouncingMaxMillis(int changeDebouncingMaxMillis) { + this.changeDebouncingMaxMillis = changeDebouncingMaxMillis; + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java index 4f6f3efc5..7a6186715 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java @@ -23,11 +23,7 @@ import com.alipay.sofa.registry.util.StringFormatter; import com.alipay.sofa.registry.util.WakeUpLoopRunnable; import com.google.common.collect.Maps; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; @@ -68,6 +64,39 @@ public void setWorkDelayTime(PushEfficiencyImproveConfig pushEfficiencyImproveCo } } + public Map getChangeDebouncingMillis() { + Map dcChangeDebouncingTimes = + new HashMap<>(dataCenterWorkers.size()); + for (Map.Entry entry : dataCenterWorkers.entrySet()) { + String dataCenter = entry.getKey(); + Worker[] workers = entry.getValue(); + if (workers == null) { + dcChangeDebouncingTimes.put(dataCenter, new ChangeDebouncingTime[0]); + continue; + } + ChangeDebouncingTime[] changeDebouncingTimes = new ChangeDebouncingTime[workers.length]; + for (int index = 0; index < workers.length; index++) { + Worker worker = workers[index]; + ChangeDebouncingTime changeDebouncingTime = worker.getChangeDebouncingTime(); + changeDebouncingTimes[index] = changeDebouncingTime; + } + dcChangeDebouncingTimes.put(dataCenter, changeDebouncingTimes); + } + return dcChangeDebouncingTimes; + } + + public void setChangeDebouncingMillis(int changeDebouncingMillis, int changeDebouncingMaxMillis) { + for (Map.Entry entry : dataCenterWorkers.entrySet()) { + Worker[] workers = entry.getValue(); + if (workers == null) { + return; + } + for (Worker work : workers) { + work.setChangeDebouncingMillis(changeDebouncingMillis, changeDebouncingMaxMillis); + } + } + } + boolean fireChange(String dataInfoId, ChangeHandler handler, TriggerPushContext changeCtx) { ChangeKey key = new ChangeKey(changeCtx.dataCenters(), dataInfoId); Worker worker = workerOf(key); @@ -121,8 +150,14 @@ public void setChangeTaskWorkDelay(PushEfficiencyImproveConfig pushEfficiencyImp this.changeTaskWaitingMillis = pushEfficiencyImproveConfig.getChangeTaskWaitingMillis(); } - int changeDebouncingMillis; - int changeDebouncingMaxMillis; + public void setChangeDebouncingMillis( + int changeDebouncingMillis, int changeDebouncingMaxMillis) { + this.changeDebouncingMillis = changeDebouncingMillis; + this.changeDebouncingMaxMillis = changeDebouncingMaxMillis; + } + + volatile int changeDebouncingMillis; + volatile int changeDebouncingMaxMillis; int changeTaskWaitingMillis = 100; Worker(int changeDebouncingMillis, int changeDebouncingMaxMillis) { @@ -205,6 +240,10 @@ public void runUnthrowable() { public int getWaitingMillis() { return changeTaskWaitingMillis; } + + public ChangeDebouncingTime getChangeDebouncingTime() { + return new ChangeDebouncingTime(this.changeDebouncingMillis, this.changeDebouncingMaxMillis); + } } static final class ChangeKey { diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java new file mode 100644 index 000000000..1fa7f4842 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdater.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import com.alipay.sofa.registry.server.session.resource.ClientManagerResource; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.SmartLifecycle; +import org.springframework.stereotype.Component; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author huicha + * @date 2025/7/24 + */ +@Component +public class PushEfficiencyConfigUpdater implements SmartLifecycle { + + private static final Logger LOGGER = LoggerFactory.getLogger(PushEfficiencyConfigUpdater.class); + + @Autowired private ChangeProcessor changeProcessor; + + @Autowired private PushProcessor pushProcessor; + + @Autowired private FirePushService firePushService; + + @Autowired private ClientManagerResource clientManagerResource; + + private Lock lock; + + private boolean stop; + + private boolean useAutoPushEfficiency = false; + + private AutoPushEfficiencyRegulator autoPushEfficiencyRegulator; + + public PushEfficiencyConfigUpdater() { + this.lock = new ReentrantLock(); + this.stop = false; + } + + /** + * ProviderData 中关于推送相关的配置发生了变化 + * + * @param pushEfficiencyImproveConfig + */ + public void updateFromProviderData(PushEfficiencyImproveConfig pushEfficiencyImproveConfig) { + this.lock.lock(); + try { + LOGGER.info( + "[PushEfficiencyConfigUpdater] update config from provider data: {}", + pushEfficiencyImproveConfig); + + if (this.stop) { + // 已销毁 + return; + } + + AutoPushEfficiencyConfig autoPushEfficiencyConfig = + pushEfficiencyImproveConfig.getAutoPushEfficiencyConfig(); + if (null != autoPushEfficiencyConfig + && autoPushEfficiencyConfig.isEnableAutoPushEfficiency()) { + // 新的配置中,开启了自动化配置 + this.useAutoPushEfficiency = true; + + if (null != this.autoPushEfficiencyRegulator) { + // 此时还存在正在运行的 AutoPushEfficiencyRegulator 则需要关掉 + LOGGER.info( + "[PushEfficiencyConfigUpdater] close old auto push efficiency regulator, id: {}, will create new one", + this.autoPushEfficiencyRegulator.getId()); + this.autoPushEfficiencyRegulator.close(); + } + + // 这里需要调整下初始配置的值 + if (autoPushEfficiencyConfig.isEnableDebouncingTime()) { + // 当自适应攒批需要调整 debouncing time 的时候,需要将 debouncing time 的初始值设置为 min + pushEfficiencyImproveConfig.setChangeDebouncingMillis( + autoPushEfficiencyConfig.getDebouncingTimeMin()); + } + + if (autoPushEfficiencyConfig.isEnableMaxDebouncingTime()) { + // 当自适应攒批需要调整 max debouncing time 的时候,需要将 debouncing time 的初始值设置为 min + pushEfficiencyImproveConfig.setChangeDebouncingMaxMillis( + autoPushEfficiencyConfig.getMaxDebouncingTimeMin()); + } + + this.autoPushEfficiencyRegulator = + new AutoPushEfficiencyRegulator(pushEfficiencyImproveConfig, this); + } else { + // 新的配置中,关闭了自动化配置,此时如果还存在正在运行的 AutoPushEfficiencyRegulator 则需要关掉 + this.useAutoPushEfficiency = false; + + if (null != this.autoPushEfficiencyRegulator) { + LOGGER.info( + "[PushEfficiencyConfigUpdater] close old auto push efficiency regulator, id: {}, will not create new one", + this.autoPushEfficiencyRegulator.getId()); + this.autoPushEfficiencyRegulator.close(); + } + + this.autoPushEfficiencyRegulator = null; + } + + // 更新一下 PushProcessor 中的 AutoPushEfficiencyRegulator,以便于统计推送次数 + this.pushProcessor.setAutoPushEfficiencyRegulator(this.autoPushEfficiencyRegulator); + + // 更新配置 + this.changeProcessor.setWorkDelayTime(pushEfficiencyImproveConfig); + this.pushProcessor.setPushTaskDelayTime(pushEfficiencyImproveConfig); + if (this.firePushService.getRegProcessor() != null) { + this.firePushService.getRegProcessor().setWorkDelayTime(pushEfficiencyImproveConfig); + } + + // 无论如何,先关闭掉限流 + this.clientManagerResource.setEnableTrafficOperate(true); + } finally { + this.lock.unlock(); + } + } + + public void updateDebouncingTime(int debouncingTime, int maxDebouncingTime) { + this.lock.lock(); + try { + if (!this.useAutoPushEfficiency) { + // 如果已经停止使用自动化配置了,那么这里就跳过更新,以防止最终实际使用的配置不是 ProvideData 中的配置 + return; + } + this.changeProcessor.setChangeDebouncingMillis(debouncingTime, maxDebouncingTime); + } finally { + this.lock.unlock(); + } + } + + public void updateTrafficOperateLimitSwitch(boolean trafficOperateLimitSwitch) { + this.lock.lock(); + try { + if (!this.useAutoPushEfficiency) { + // 如果已经停止使用自动化配置了,那么这里就跳过更新,以防止最终实际使用的配置不是 ProvideData 中的配置 + return; + } + + // 打开限制开关,意味着开启了限流,也就是不允许操作开关流,因此这里是反的 + this.clientManagerResource.setEnableTrafficOperate(!trafficOperateLimitSwitch); + } finally { + this.lock.unlock(); + } + } + + @Override + public void start() {} + + @Override + public void stop() { + // Bean 被销毁的时候需要清理释放线程资源 + this.lock.lock(); + try { + if (!this.stop) { + this.stop = true; + if (null != this.autoPushEfficiencyRegulator) { + this.autoPushEfficiencyRegulator.close(); + } + } + } finally { + this.lock.unlock(); + } + } + + @Override + public boolean isRunning() { + this.lock.lock(); + try { + return this.stop; + } finally { + this.lock.unlock(); + } + } + + @VisibleForTesting + public AutoPushEfficiencyRegulator getAutoPushEfficiencyRegulator() { + return autoPushEfficiencyRegulator; + } + + @VisibleForTesting + public void setChangeProcessor(ChangeProcessor changeProcessor) { + this.changeProcessor = changeProcessor; + } + + @VisibleForTesting + public void setPushProcessor(PushProcessor pushProcessor) { + this.pushProcessor = pushProcessor; + } + + @VisibleForTesting + public void setFirePushService(FirePushService firePushService) { + this.firePushService = firePushService; + } + + @VisibleForTesting + public void setClientManagerResource(ClientManagerResource clientManagerResource) { + this.clientManagerResource = clientManagerResource; + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java index a897c5f72..7a4e17fd4 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.builder.ToStringBuilder; /** * @author jiangcun.hlc@antfin.com @@ -80,6 +81,9 @@ public class PushEfficiencyImproveConfig { /** session 处理 pushTask delay pushTaskDebouncingMillis 时间处理,可以合并相同的推送任务,避免数据连续变化触发大量推送, 默认500ms */ private int sbfAppPushTaskDebouncingMillis = DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS; + /** 自动优化的相关配置 */ + private AutoPushEfficiencyConfig autoPushEfficiencyConfig = null; + /** * 判断是否满足 三板斧灰度条件 * @@ -251,6 +255,14 @@ public void setRegWorkWake(boolean regWorkWake) { this.regWorkWake = regWorkWake; } + public AutoPushEfficiencyConfig getAutoPushEfficiencyConfig() { + return autoPushEfficiencyConfig; + } + + public void setAutoPushEfficiencyConfig(AutoPushEfficiencyConfig autoPushEfficiencyConfig) { + this.autoPushEfficiencyConfig = autoPushEfficiencyConfig; + } + public void setSessionServerConfig(SessionServerConfig sessionServerConfig) { if (null != sessionServerConfig && StringUtils.isNotBlank(sessionServerConfig.getSessionServerRegion())) { @@ -272,44 +284,6 @@ public boolean validate() { @Override public String toString() { - return "PushEfficiencyImproveConfig{" - + "CURRENT_ZONE='" - + CURRENT_ZONE - + '\'' - + ", CURRENT_IP=" - + CURRENT_IP - + ", inIpZoneSBF=" - + inIpZoneSBF() - + ", DEFAULT_CHANGE_DEBOUNCING_MILLIS=" - + DEFAULT_CHANGE_DEBOUNCING_MILLIS - + ", DEFAULT_CHANGE_DEBOUNCING_MAX_MILLIS=" - + DEFAULT_CHANGE_DEBOUNCING_MAX_MILLIS - + ", DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS=" - + DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS - + ", changeDebouncingMillis=" - + changeDebouncingMillis - + ", changeDebouncingMaxMillis=" - + changeDebouncingMaxMillis - + ", changeTaskWaitingMillis=" - + changeTaskWaitingMillis - + ", pushTaskWaitingMillis=" - + pushTaskWaitingMillis - + ", pushTaskDebouncingMillis=" - + pushTaskDebouncingMillis - + ", regWorkWaitingMillis=" - + regWorkWaitingMillis - + ", ipSet=" - + ipSet - + ", zoneSet=" - + zoneSet - + ", subAppSet=" - + subAppSet - + ", sbfAppPushTaskDebouncingMillis=" - + sbfAppPushTaskDebouncingMillis - + ", pushTaskWake=" - + pushTaskWake - + ", regWorkWake=" - + regWorkWake - + '}'; + return ToStringBuilder.reflectionToString(this); } } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java index 6f4ac35a2..d287269f1 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java @@ -67,7 +67,9 @@ public class PushProcessor { @Autowired protected ClientNodeService clientNodeService; - @Autowired protected CircuitBreakerService circuitBreakerService;; + @Autowired protected CircuitBreakerService circuitBreakerService; + + private volatile AutoPushEfficiencyRegulator autoPushEfficiencyRegulator; private int pushDataTaskDebouncingMillis = 500; private PushEfficiencyImproveConfig pushEfficiencyImproveConfig; @@ -97,6 +99,11 @@ public void setPushTaskDelayTime(PushEfficiencyImproveConfig pushEfficiencyImpro this.pushEfficiencyImproveConfig = pushEfficiencyImproveConfig; } + public void setAutoPushEfficiencyRegulator( + AutoPushEfficiencyRegulator autoPushEfficiencyRegulator) { + this.autoPushEfficiencyRegulator = autoPushEfficiencyRegulator; + } + void intTaskBuffer() { if (this.taskBuffer == null) { this.taskBuffer = new PushTaskBuffer(sessionServerConfig.getPushTaskBufferBucketSize()); @@ -340,6 +347,13 @@ boolean doPush(PushTask task) { return false; } + // 如果需要则进行推送计数,以便于可以自动化调整攒批 + AutoPushEfficiencyRegulator currentAutoPushEfficiencyRegulator = + this.autoPushEfficiencyRegulator; + if (null != currentAutoPushEfficiencyRegulator) { + currentAutoPushEfficiencyRegulator.safeIncrementPushCount(); + } + pushingRecords.put( task.pushingTaskKey, new PushRecord( diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java index b9fde3322..2d429abca 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java @@ -78,6 +78,8 @@ public class ClientManagerResource { @Autowired protected ExecutorManager executorManager; + private volatile boolean enableTrafficOperate = true; + /** * Client off * @@ -90,6 +92,12 @@ public CommonResponse clientOff(@FormParam("ips") String ips) { if (StringUtils.isEmpty(ips)) { return CommonResponse.buildFailedResponse("ips is empty"); } + + if (!this.enableTrafficOperate) { + // 限流,不允许操作开关流 + return CommonResponse.buildFailedResponse("too many request"); + } + final Set ipSet = CollectionSdks.toIpSet(ips); List conIds = connectionsService.getIpConnects(ipSet); sessionRegistry.clientOff(conIds); @@ -109,6 +117,12 @@ public CommonResponse clientOn(@FormParam("ips") String ips) { if (StringUtils.isEmpty(ips)) { return CommonResponse.buildFailedResponse("ips is empty"); } + + if (!this.enableTrafficOperate) { + // 限流,不允许操作开关流 + return CommonResponse.buildFailedResponse("too many request"); + } + final List ipList = CollectionSdks.toIpList(ips); List conIds = connectionsService.closeIpConnects(ipList); LOGGER.info("clientOn ips={}, conIds={}", ips, conIds); @@ -128,6 +142,12 @@ public CommonResponse clientOffInZone(@FormParam("ips") String ips) { if (StringUtils.isEmpty(ips)) { return CommonResponse.buildFailedResponse("ips is empty"); } + + if (!this.enableTrafficOperate) { + // 限流,不允许操作开关流 + return CommonResponse.buildFailedResponse("too many request"); + } + CommonResponse resp = clientOff(ips); if (!resp.isSuccess()) { return resp; @@ -164,6 +184,12 @@ public CommonResponse clientOnInZone(@FormParam("ips") String ips) { if (StringUtils.isEmpty(ips)) { return CommonResponse.buildFailedResponse("ips is empty"); } + + if (!this.enableTrafficOperate) { + // 限流,不允许操作开关流 + return CommonResponse.buildFailedResponse("too many request"); + } + CommonResponse resp = clientOn(ips); if (!resp.isSuccess()) { return resp; @@ -238,4 +264,12 @@ public Map connectionMapper() { public List getOtherConsoleServersCurrentZone() { return Sdks.getOtherConsoleServers(null, sessionServerConfig, metaServerService); } + + public boolean isEnableTrafficOperate() { + return enableTrafficOperate; + } + + public void setEnableTrafficOperate(boolean enableTrafficOperate) { + this.enableTrafficOperate = enableTrafficOperate; + } } diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java new file mode 100644 index 000000000..93bb494f4 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/PushEfficiencyConfigResource.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.resource; + +import com.alipay.sofa.registry.common.model.GenericResponse; +import com.alipay.sofa.registry.server.session.push.ChangeDebouncingTime; +import com.alipay.sofa.registry.server.session.push.ChangeProcessor; +import com.alipay.sofa.registry.server.shared.resource.AuthChecker; +import java.util.Map; +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author huicha + * @date 2025/9/8 + */ +@Path("api/push/efficiency") +public class PushEfficiencyConfigResource { + + private Logger LOGGER = LoggerFactory.getLogger(PushEfficiencyConfigResource.class); + + @Autowired private ChangeProcessor changeProcessor; + + @GET + @Path("/getChangeDebouncingMillis") + @Produces(MediaType.APPLICATION_JSON) + public GenericResponse> getChangeDebouncingMillis( + @HeaderParam("token") String token) { + try { + if (!AuthChecker.authCheck(token)) { + LOGGER.error( + "[module=PushEfficiencyConfigResource][method=getChangeDebouncingMillis] auth check={} fail!", + token); + return new GenericResponse().fillFailed("auth check fail"); + } + + Map changeDebouncingTimes = + this.changeProcessor.getChangeDebouncingMillis(); + return new GenericResponse().fillSucceed(changeDebouncingTimes); + } catch (Throwable throwable) { + LOGGER.error( + "[module=PushEfficiencyConfigResource][method=getChangeDebouncingMillis] getChangeDebouncingMillis exception", + throwable); + return new GenericResponse().fillFailed("getChangeDebouncingMillis exception"); + } + } +} diff --git a/server/server/session/src/main/resources/log4j2.xml b/server/server/session/src/main/resources/log4j2.xml index c858270d8..3effc1855 100644 --- a/server/server/session/src/main/resources/log4j2.xml +++ b/server/server/session/src/main/resources/log4j2.xml @@ -841,6 +841,21 @@ + + + + + + + + + + + + + + @@ -1121,13 +1136,18 @@ + + + + + + - diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigServiceTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigServiceTest.java index 1f37b1ccd..d59737fb4 100644 --- a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigServiceTest.java +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigServiceTest.java @@ -22,9 +22,7 @@ import com.alipay.sofa.registry.common.model.constants.ValueConstants; import com.alipay.sofa.registry.common.model.metaserver.ProvideData; import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig; -import com.alipay.sofa.registry.server.session.push.ChangeProcessor; -import com.alipay.sofa.registry.server.session.push.FirePushService; -import com.alipay.sofa.registry.server.session.push.PushProcessor; +import com.alipay.sofa.registry.server.session.push.PushEfficiencyConfigUpdater; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -41,9 +39,7 @@ public class FetchPushEfficiencyConfigServiceTest { public void beforeTest() { fetchPushEfficiencyConfigService = new FetchPushEfficiencyConfigService(); fetchPushEfficiencyConfigService - .setPushProcessor(mock(PushProcessor.class)) - .setChangeProcessor(mock(ChangeProcessor.class)) - .setFirePushService(mock(FirePushService.class)) + .setPushEfficiencyConfigUpdater(mock(PushEfficiencyConfigUpdater.class)) .setSessionServerConfig(mock(SessionServerConfig.class)); } diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java new file mode 100644 index 000000000..62cb09b6a --- /dev/null +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulatorTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import java.util.Collections; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * @author huicha + * @date 2025/7/24 + */ +public class AutoPushEfficiencyRegulatorTest { + + @Test + public void testUpdateDebouncingTimeAndMaxDebouncingTime() throws InterruptedException { + // 开启自动化配置 + AutoPushEfficiencyConfig autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); + autoPushEfficiencyConfig.setEnableAutoPushEfficiency(true); + autoPushEfficiencyConfig.setEnableDebouncingTime(true); + autoPushEfficiencyConfig.setEnableMaxDebouncingTime(true); + + // 总推送数超过 10 笔就触发增加攒批时长 + autoPushEfficiencyConfig.setPushCountThreshold(10); + + autoPushEfficiencyConfig.setWindowNum(10); + autoPushEfficiencyConfig.setWindowTimeMillis(100); + + PushEfficiencyImproveConfig pushEfficiencyImproveConfig = new PushEfficiencyImproveConfig(); + pushEfficiencyImproveConfig.setAutoPushEfficiencyConfig(autoPushEfficiencyConfig); + + MockPushEfficiencyConfigUpdater mockPushEfficiencyConfigUpdater = + new MockPushEfficiencyConfigUpdater(); + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater = + Mockito.mock(PushEfficiencyConfigUpdater.class, mockPushEfficiencyConfigUpdater); + + AutoPushEfficiencyRegulator autoPushEfficiencyRegulator = + new AutoPushEfficiencyRegulator(pushEfficiencyImproveConfig, pushEfficiencyConfigUpdater); + + try { + for (int loop = 0; loop < 40; loop++) { + for (int i = 0; i < 5; i++) { + autoPushEfficiencyRegulator.safeIncrementPushCount(); + } + Thread.sleep(50); + } + + int debouncingTime = mockPushEfficiencyConfigUpdater.getDebouncingTime(); + int maxDebouncingTime = mockPushEfficiencyConfigUpdater.getMaxDebouncingTime(); + Assert.assertEquals( + debouncingTime, 1000 /*AutoPushEfficiencyConfig.DEFAULT_DEBOUNCING_TIME_MAX*/); + Assert.assertEquals( + maxDebouncingTime, 3000 /*AutoPushEfficiencyConfig.DEFAULT_MAX_DEBOUNCING_TIME_MAX*/); + + Thread.sleep(2000); + + debouncingTime = mockPushEfficiencyConfigUpdater.getDebouncingTime(); + maxDebouncingTime = mockPushEfficiencyConfigUpdater.getMaxDebouncingTime(); + Assert.assertEquals( + debouncingTime, 100 /*AutoPushEfficiencyConfig.DEFAULT_DEBOUNCING_TIME_MIN*/); + Assert.assertEquals( + maxDebouncingTime, 1000 /*AutoPushEfficiencyConfig.DEFAULT_MAX_DEBOUNCING_TIME_MIN*/); + } finally { + autoPushEfficiencyRegulator.close(); + } + } + + @Test + public void testOnlyUpdateDebouncingTime() throws InterruptedException { + // 开启自动化配置 + AutoPushEfficiencyConfig autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); + autoPushEfficiencyConfig.setEnableAutoPushEfficiency(true); + autoPushEfficiencyConfig.setEnableDebouncingTime(true); + + // 不自适应调整 max debouncing time,但是这里设置单独的初始配置 + // 预期这个初始配置不生效 + autoPushEfficiencyConfig.setEnableMaxDebouncingTime(false); + autoPushEfficiencyConfig.setMaxDebouncingTimeMin(10); + autoPushEfficiencyConfig.setMaxDebouncingTimeMax(100); + autoPushEfficiencyConfig.setMaxDebouncingTimeStep(10); + + // 总推送数超过 10 笔就触发增加攒批时长 + autoPushEfficiencyConfig.setPushCountThreshold(10); + + autoPushEfficiencyConfig.setWindowNum(10); + autoPushEfficiencyConfig.setWindowTimeMillis(100); + + PushEfficiencyImproveConfig pushEfficiencyImproveConfig = new PushEfficiencyImproveConfig(); + pushEfficiencyImproveConfig.setZoneSet(Collections.singleton("ALL_ZONE")); + pushEfficiencyImproveConfig.setAutoPushEfficiencyConfig(autoPushEfficiencyConfig); + + // 设置一个特别的 max debouncing time,预期后面自适应调整攒批配置的时候,max debouncing time 始终为 15000 + pushEfficiencyImproveConfig.setChangeDebouncingMaxMillis(15000); + + MockPushEfficiencyConfigUpdater mockPushEfficiencyConfigUpdater = + new MockPushEfficiencyConfigUpdater(); + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater = + Mockito.mock(PushEfficiencyConfigUpdater.class, mockPushEfficiencyConfigUpdater); + + AutoPushEfficiencyRegulator autoPushEfficiencyRegulator = + new AutoPushEfficiencyRegulator(pushEfficiencyImproveConfig, pushEfficiencyConfigUpdater); + + try { + for (int loop = 0; loop < 40; loop++) { + for (int i = 0; i < 5; i++) { + autoPushEfficiencyRegulator.safeIncrementPushCount(); + } + Thread.sleep(50); + } + + int debouncingTime = mockPushEfficiencyConfigUpdater.getDebouncingTime(); + int maxDebouncingTime = mockPushEfficiencyConfigUpdater.getMaxDebouncingTime(); + Assert.assertEquals( + debouncingTime, 1000 /*AutoPushEfficiencyConfig.DEFAULT_DEBOUNCING_TIME_MAX*/); + Assert.assertEquals( + maxDebouncingTime, 15000 /*AutoPushEfficiencyConfig.DEFAULT_MAX_DEBOUNCING_TIME_MAX*/); + + Thread.sleep(2000); + + debouncingTime = mockPushEfficiencyConfigUpdater.getDebouncingTime(); + maxDebouncingTime = mockPushEfficiencyConfigUpdater.getMaxDebouncingTime(); + Assert.assertEquals( + debouncingTime, 100 /*AutoPushEfficiencyConfig.DEFAULT_DEBOUNCING_TIME_MIN*/); + Assert.assertEquals( + maxDebouncingTime, 15000 /*AutoPushEfficiencyConfig.DEFAULT_MAX_DEBOUNCING_TIME_MIN*/); + } finally { + autoPushEfficiencyRegulator.close(); + } + } +} + +class MockPushEfficiencyConfigUpdater implements Answer { + + private int debouncingTime; + + private int maxDebouncingTime; + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + this.debouncingTime = invocation.getArgumentAt(0, Integer.class); + this.maxDebouncingTime = invocation.getArgumentAt(1, Integer.class); + return null; + } + + public int getDebouncingTime() { + return debouncingTime; + } + + public void setDebouncingTime(int debouncingTime) { + this.debouncingTime = debouncingTime; + } + + public int getMaxDebouncingTime() { + return maxDebouncingTime; + } + + public void setMaxDebouncingTime(int maxDebouncingTime) { + this.maxDebouncingTime = maxDebouncingTime; + } +} diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java new file mode 100644 index 000000000..dd8d80ee1 --- /dev/null +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyConfigUpdaterTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.registry.server.session.push; + +import com.alipay.sofa.registry.server.session.resource.ClientManagerResource; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * @author huicha + * @date 2025/7/24 + */ +public class PushEfficiencyConfigUpdaterTest { + + @Test + public void testUpdateFromProviderData() { + ChangeProcessor changeProcessor = Mockito.mock(ChangeProcessor.class); + PushProcessor pushProcessor = Mockito.mock(PushProcessor.class); + FirePushService firePushService = Mockito.mock(FirePushService.class); + ClientManagerResource clientManagerResource = Mockito.mock(ClientManagerResource.class); + + PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater = new PushEfficiencyConfigUpdater(); + pushEfficiencyConfigUpdater.setChangeProcessor(changeProcessor); + pushEfficiencyConfigUpdater.setPushProcessor(pushProcessor); + pushEfficiencyConfigUpdater.setFirePushService(firePushService); + pushEfficiencyConfigUpdater.setClientManagerResource(clientManagerResource); + + // 更新没有开启自动化配置,因此预期是 null + pushEfficiencyConfigUpdater.updateFromProviderData(new PushEfficiencyImproveConfig()); + + AutoPushEfficiencyRegulator autoPushEfficiencyRegulator = + pushEfficiencyConfigUpdater.getAutoPushEfficiencyRegulator(); + Assert.assertNull(autoPushEfficiencyRegulator); + + // 第二次开启自动化配置 + AutoPushEfficiencyConfig autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); + autoPushEfficiencyConfig.setEnableAutoPushEfficiency(true); + autoPushEfficiencyConfig.setEnableDebouncingTime(true); + autoPushEfficiencyConfig.setEnableMaxDebouncingTime(true); + autoPushEfficiencyConfig.setWindowTimeMillis(10); + + PushEfficiencyImproveConfig pushEfficiencyImproveConfig = new PushEfficiencyImproveConfig(); + pushEfficiencyImproveConfig.setAutoPushEfficiencyConfig(autoPushEfficiencyConfig); + + pushEfficiencyConfigUpdater.updateFromProviderData(pushEfficiencyImproveConfig); + + autoPushEfficiencyRegulator = pushEfficiencyConfigUpdater.getAutoPushEfficiencyRegulator(); + Assert.assertNotNull(autoPushEfficiencyRegulator); + + // 第三次仍然开启,但是我们修改一部分配置 + autoPushEfficiencyConfig = new AutoPushEfficiencyConfig(); + autoPushEfficiencyConfig.setEnableAutoPushEfficiency(true); + autoPushEfficiencyConfig.setEnableDebouncingTime(true); + autoPushEfficiencyConfig.setEnableMaxDebouncingTime(true); + autoPushEfficiencyConfig.setWindowTimeMillis(10); + autoPushEfficiencyConfig.setWindowNum(3); + autoPushEfficiencyConfig.setPushCountThreshold(10); + + pushEfficiencyImproveConfig = new PushEfficiencyImproveConfig(); + pushEfficiencyImproveConfig.setAutoPushEfficiencyConfig(autoPushEfficiencyConfig); + + pushEfficiencyConfigUpdater.updateFromProviderData(pushEfficiencyImproveConfig); + + AutoPushEfficiencyRegulator newAutoPushEfficiencyRegulator = + pushEfficiencyConfigUpdater.getAutoPushEfficiencyRegulator(); + Assert.assertNotNull(newAutoPushEfficiencyRegulator); + + int windowNum = newAutoPushEfficiencyRegulator.getWindowNum(); + int windowSize = newAutoPushEfficiencyRegulator.getWindowsSize(); + long pushCountThreshold = newAutoPushEfficiencyRegulator.getPushCountThreshold(); + + Assert.assertEquals(3, windowNum); + Assert.assertEquals(3, windowSize); + Assert.assertEquals(10, pushCountThreshold); + + // 此时之前的 AutoPushEfficiencyRegulator 应当为关闭状态 + Assert.assertTrue(autoPushEfficiencyRegulator.isClosed()); + + // 清理释放线程资源 + pushEfficiencyConfigUpdater.stop(); + + // 新的 AutoPushEfficiencyRegulator 也应当为关闭状态 + Assert.assertTrue(newAutoPushEfficiencyRegulator.isClosed()); + } +} diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/SessionInterestsTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/SessionInterestsTest.java index 02406e9e3..65f9cdbdb 100644 --- a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/SessionInterestsTest.java +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/store/SessionInterestsTest.java @@ -16,19 +16,14 @@ */ package com.alipay.sofa.registry.server.session.store; -import com.alipay.sofa.registry.common.model.ElementType; import com.alipay.sofa.registry.common.model.dataserver.DatumVersion; import com.alipay.sofa.registry.common.model.sessionserver.SubscriberCountByApp; -import com.alipay.sofa.registry.common.model.store.BaseInfo; import com.alipay.sofa.registry.common.model.store.DataInfo; import com.alipay.sofa.registry.common.model.store.Subscriber; -import com.alipay.sofa.registry.common.model.store.URL; -import com.alipay.sofa.registry.core.model.ScopeEnum; import com.alipay.sofa.registry.server.session.AbstractSessionServerTestBase; import com.alipay.sofa.registry.server.session.registry.SessionRegistry.SelectSubscriber; import com.google.common.collect.Maps; import com.google.common.collect.Sets; - import java.util.*; import java.util.Map.Entry; import org.junit.Assert; @@ -221,7 +216,8 @@ public void testGetInterestsByOption() { String testOptionDataId = "test-option-data-id"; String testOptionGroup = "test-option-group"; String testOptionInstanceId = "test-option-instance-id"; - String testOptionDataInfoId = DataInfo.toDataInfoId(testOptionDataId, testOptionInstanceId, testOptionGroup); + String testOptionDataInfoId = + DataInfo.toDataInfoId(testOptionDataId, testOptionInstanceId, testOptionGroup); String testOptionAppNameOne = "test-option-app-1"; String testOptionAppNameTwo = "test-option-app-2"; int appOneNum = 3; @@ -229,20 +225,25 @@ public void testGetInterestsByOption() { Map appOneSubscribers = new HashMap<>(); for (int i = 0; i < appOneNum; i++) { - Subscriber subscriber = createSubscriber(testOptionDataId, testOptionGroup, testOptionInstanceId, testOptionAppNameOne); + Subscriber subscriber = + createSubscriber( + testOptionDataId, testOptionGroup, testOptionInstanceId, testOptionAppNameOne); appOneSubscribers.put(subscriber.getRegisterId(), subscriber); this.interests.add(subscriber); } Map appTwoSubscribers = new HashMap<>(); for (int i = 0; i < appTwoNum; i++) { - Subscriber subscriber = createSubscriber(testOptionDataId, testOptionGroup, testOptionInstanceId, testOptionAppNameTwo); + Subscriber subscriber = + createSubscriber( + testOptionDataId, testOptionGroup, testOptionInstanceId, testOptionAppNameTwo); appTwoSubscribers.put(subscriber.getRegisterId(), subscriber); this.interests.add(subscriber); } // 预期可以查询出全部的 app-1 - Collection subscribersResult1 = this.interests.getInterestsByOption(testOptionDataInfoId, testOptionAppNameOne, appOneNum); + Collection subscribersResult1 = + this.interests.getInterestsByOption(testOptionDataInfoId, testOptionAppNameOne, appOneNum); Assert.assertEquals(subscribersResult1.size(), appOneNum); // size 相同且每个元素都存在,那么就证明是 Assert.assertEquals(subscribersResult1.size(), appOneSubscribers.size()); @@ -253,7 +254,8 @@ public void testGetInterestsByOption() { } // 预期可以查询出 1 个 app-1 - Collection subscribersResult2 = this.interests.getInterestsByOption(testOptionDataInfoId, testOptionAppNameOne, 1); + Collection subscribersResult2 = + this.interests.getInterestsByOption(testOptionDataInfoId, testOptionAppNameOne, 1); Assert.assertEquals(subscribersResult2.size(), 1); Subscriber oneOfSubscribers2 = subscribersResult2.iterator().next(); Assert.assertEquals(oneOfSubscribers2.getAppName(), testOptionAppNameOne); @@ -261,11 +263,16 @@ public void testGetInterestsByOption() { Assert.assertTrue(appOneSubscribers.containsKey(oneOfSubscribers2.getRegisterId())); // 不设置任何查询条件,预期可以查询出全部的 app-1 和 app-2 - Collection subscribersResult3 = this.interests.getInterestsByOption(testOptionDataInfoId, null, 0); + Collection subscribersResult3 = + this.interests.getInterestsByOption(testOptionDataInfoId, null, 0); Assert.assertEquals(subscribersResult3.size(), appOneNum + appTwoNum); for (Subscriber subscriber : subscribersResult3) { - Assert.assertTrue(appOneSubscribers.containsKey(subscriber.getRegisterId()) || appTwoSubscribers.containsKey(subscriber.getRegisterId())); - Assert.assertTrue(subscriber.getAppName().equals(testOptionAppNameOne) || subscriber.getAppName().equals(testOptionAppNameTwo)); + Assert.assertTrue( + appOneSubscribers.containsKey(subscriber.getRegisterId()) + || appTwoSubscribers.containsKey(subscriber.getRegisterId())); + Assert.assertTrue( + subscriber.getAppName().equals(testOptionAppNameOne) + || subscriber.getAppName().equals(testOptionAppNameTwo)); Assert.assertEquals(subscriber.getDataInfoId(), testOptionDataInfoId); } } @@ -275,23 +282,29 @@ public void testGetSubscriberCountByApp() { String testCountDataId = "test-count-data-id"; String testCountGroup = "test-count-group"; String testCountInstanceId = "test-count-instance-id"; - String testCountDataInfoId = DataInfo.toDataInfoId(testCountDataId, testCountInstanceId, testCountGroup); + String testCountDataInfoId = + DataInfo.toDataInfoId(testCountDataId, testCountInstanceId, testCountGroup); String testCountAppNameOne = "test-count-app-1"; String testCountAppNameTwo = "test-count-app-2"; int appOneNum = 3; int appTwoNum = 2; for (int i = 0; i < appOneNum; i++) { - Subscriber subscriber = createSubscriber(testCountDataId, testCountGroup, testCountInstanceId, testCountAppNameOne); + Subscriber subscriber = + createSubscriber( + testCountDataId, testCountGroup, testCountInstanceId, testCountAppNameOne); this.interests.add(subscriber); } for (int i = 0; i < appTwoNum; i++) { - Subscriber subscriber = createSubscriber(testCountDataId, testCountGroup, testCountInstanceId, testCountAppNameTwo); + Subscriber subscriber = + createSubscriber( + testCountDataId, testCountGroup, testCountInstanceId, testCountAppNameTwo); this.interests.add(subscriber); } - List subscriberCountByApps = this.interests.getSubscriberCountByApp(testCountDataInfoId); + List subscriberCountByApps = + this.interests.getSubscriberCountByApp(testCountDataInfoId); Assert.assertEquals(subscriberCountByApps.size(), 2); for (SubscriberCountByApp subscriberCountByApp : subscriberCountByApps) { @@ -303,14 +316,14 @@ public void testGetSubscriberCountByApp() { } } - private Subscriber createSubscriber(String dataInfo, String group, String instanceId, String appName) { + private Subscriber createSubscriber( + String dataInfo, String group, String instanceId, String appName) { Subscriber subscriber = this.randomSubscriber(dataInfo, instanceId); subscriber.setAppName(appName); subscriber.setGroup(group); subscriber.setDataInfoId( - DataInfo.toDataInfoId( - subscriber.getDataId(), subscriber.getInstanceId(), subscriber.getGroup())); + DataInfo.toDataInfoId( + subscriber.getDataId(), subscriber.getInstanceId(), subscriber.getGroup())); return subscriber; } - } diff --git a/server/server/shared/pom.xml b/server/server/shared/pom.xml index 83ffd6452..9f3f01f3e 100644 --- a/server/server/shared/pom.xml +++ b/server/server/shared/pom.xml @@ -5,7 +5,7 @@ registry-server com.alipay.sofa - 6.6.0 + 6.6.1-auto-regulator 4.0.0 diff --git a/server/store/api/pom.xml b/server/store/api/pom.xml index c78c6a09b..7de0d1b84 100644 --- a/server/store/api/pom.xml +++ b/server/store/api/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-store - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/server/store/jdbc/pom.xml b/server/store/jdbc/pom.xml index 6e885e7be..9b41cbe6c 100644 --- a/server/store/jdbc/pom.xml +++ b/server/store/jdbc/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-store - 6.6.0 + 6.6.1-auto-regulator ../pom.xml diff --git a/server/store/jraft/pom.xml b/server/store/jraft/pom.xml index 33b39f945..3dca5d97d 100644 --- a/server/store/jraft/pom.xml +++ b/server/store/jraft/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa registry-store - 6.6.0 + 6.6.1-auto-regulator ../pom.xml diff --git a/server/store/pom.xml b/server/store/pom.xml index ca0764cf8..63723a38e 100644 --- a/server/store/pom.xml +++ b/server/store/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-server-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0 diff --git a/test/pom.xml b/test/pom.xml index 5b7720792..42cd0aaa9 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -5,7 +5,7 @@ com.alipay.sofa registry-parent - 6.6.0 + 6.6.1-auto-regulator ../pom.xml 4.0.0