Skip to content

Commit

Permalink
[type:refactor] refactor discovery plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
yunlongn committed Oct 24, 2024
1 parent 492977e commit 8b8466e
Show file tree
Hide file tree
Showing 52 changed files with 708 additions and 3,060 deletions.
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
<module>shenyu-dist</module>
<module>shenyu-alert</module>
<module>shenyu-sdk</module>
<module>shenyu-discovery</module>
<module>shenyu-registry</module>
<module>shenyu-kubernetes-controller</module>
</modules>
Expand Down
38 changes: 6 additions & 32 deletions shenyu-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -296,38 +296,6 @@
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>

<!-- shenyu discovery start -->
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-discovery-etcd</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-discovery-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-discovery-nacos</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-discovery-eureka</artifactId>
<exclusions>
<exclusion>
<groupId>jakarta.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
<version>${project.version}</version>
</dependency>
<!-- shenyu discovery end -->

<!-- shenyu-admin-listener start -->
<dependency>
<groupId>org.apache.shenyu</groupId>
Expand Down Expand Up @@ -358,6 +326,12 @@
<artifactId>shenyu-admin-listener-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-registry-core</artifactId>
<version>${project.version}</version>
</dependency>
<!-- shenyu-admin-listener end-->

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.shenyu.admin.discovery;

import org.apache.shenyu.admin.discovery.listener.DataChangedEventListener;
import org.apache.shenyu.admin.discovery.listener.DiscoveryDataChangedEvent;
import org.apache.shenyu.admin.exception.ShenyuAdminException;
import org.apache.shenyu.admin.listener.DataChangedEvent;
import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
Expand All @@ -25,7 +27,8 @@
import org.apache.shenyu.admin.transfer.DiscoveryTransfer;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
import org.apache.shenyu.registry.api.ShenyuInstanceRegisterRepository;
import org.apache.shenyu.registry.api.event.ChangedEventListener;

import java.util.Collections;
import java.util.Objects;
Expand All @@ -44,7 +47,7 @@ public APDiscoveryProcessor(final DiscoveryUpstreamMapper discoveryUpstreamMappe

@Override
public void createProxySelector(final DiscoveryHandlerDTO discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
ShenyuDiscoveryService shenyuDiscoveryService = getShenyuDiscoveryService(discoveryHandlerDTO.getDiscoveryId());
ShenyuInstanceRegisterRepository shenyuDiscoveryService = getShenyuDiscoveryService(discoveryHandlerDTO.getDiscoveryId());
String key = super.buildProxySelectorKey(discoveryHandlerDTO.getListenerNode());
if (Objects.isNull(shenyuDiscoveryService)) {
throw new ShenyuAdminException(String.format("before start ProxySelector you need init DiscoveryId=%s", discoveryHandlerDTO.getDiscoveryId()));
Expand All @@ -54,7 +57,18 @@ public void createProxySelector(final DiscoveryHandlerDTO discoveryHandlerDTO, f
LOG.info("shenyu discovery has watcher key = {}", key);
return;
}
shenyuDiscoveryService.watch(key, getDiscoveryDataChangedEventListener(discoveryHandlerDTO, proxySelectorDTO));
final DataChangedEventListener discoveryDataChangedEventListener = getDiscoveryDataChangedEventListener(discoveryHandlerDTO, proxySelectorDTO);
shenyuDiscoveryService.watchInstances(key, (selectKey, selectValue, event) -> {
if (event.equals(ChangedEventListener.Event.ADDED)) {
discoveryDataChangedEventListener.onChange(new DiscoveryDataChangedEvent(selectKey, selectValue, DiscoveryDataChangedEvent.Event.ADDED));
} else if (event.equals(ChangedEventListener.Event.UPDATED)) {
discoveryDataChangedEventListener.onChange(new DiscoveryDataChangedEvent(selectKey, selectValue, DiscoveryDataChangedEvent.Event.UPDATED));
} else if (event.equals(ChangedEventListener.Event.DELETED)) {
discoveryDataChangedEventListener.onChange(new DiscoveryDataChangedEvent(selectKey, selectValue, DiscoveryDataChangedEvent.Event.DELETED));
} else {
discoveryDataChangedEventListener.onChange(new DiscoveryDataChangedEvent(selectKey, selectValue, DiscoveryDataChangedEvent.Event.IGNORED));
}
});
cacheKey.add(key);
DataChangedEvent dataChangedEvent = new DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.CREATE,
Collections.singletonList(DiscoveryTransfer.INSTANCE.mapToData(proxySelectorDTO)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shenyu.admin.discovery;

import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.admin.discovery.listener.DataChangedEventListener;
import org.apache.shenyu.admin.discovery.parse.CustomDiscoveryUpstreamParser;
import org.apache.shenyu.admin.listener.DataChangedEvent;
import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
Expand All @@ -33,34 +34,34 @@
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.common.utils.UUIDUtils;
import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
import org.apache.shenyu.registry.api.ShenyuInstanceRegisterRepository;
import org.apache.shenyu.registry.api.config.RegisterConfig;
import org.apache.shenyu.registry.api.entity.InstanceEntity;
import org.apache.shenyu.spi.ExtensionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.Properties;
import java.util.HashSet;
import java.util.Collections;

public abstract class AbstractDiscoveryProcessor implements DiscoveryProcessor, ApplicationEventPublisherAware {

protected static final String DEFAULT_LISTENER_NODE = "/shenyu/discovery";

protected static final Logger LOG = LoggerFactory.getLogger(DefaultDiscoveryProcessor.class);

private final Map<String, ShenyuDiscoveryService> discoveryServiceCache;
private final Map<String, ShenyuInstanceRegisterRepository> discoveryServiceCache;

private final Map<String, Set<String>> dataChangedEventListenerCache;

Expand Down Expand Up @@ -88,11 +89,11 @@ public void createDiscovery(final DiscoveryDO discoveryDO) {
String type = discoveryDO.getType();
String props = discoveryDO.getProps();
Properties properties = GsonUtils.getGson().fromJson(props, Properties.class);
DiscoveryConfig discoveryConfig = new DiscoveryConfig();
discoveryConfig.setType(type);
RegisterConfig discoveryConfig = new RegisterConfig();
discoveryConfig.setRegisterType(type);
discoveryConfig.setProps(properties);
discoveryConfig.setServerList(discoveryDO.getServerList());
ShenyuDiscoveryService discoveryService = ExtensionLoader.getExtensionLoader(ShenyuDiscoveryService.class).getJoin(type);
discoveryConfig.setServerLists(discoveryDO.getServerList());
ShenyuInstanceRegisterRepository discoveryService = ExtensionLoader.getExtensionLoader(ShenyuInstanceRegisterRepository.class).getJoin(type);
discoveryService.init(discoveryConfig);
discoveryServiceCache.put(discoveryDO.getId(), discoveryService);
dataChangedEventListenerCache.put(discoveryDO.getId(), new HashSet<>());
Expand All @@ -106,12 +107,12 @@ public void createDiscovery(final DiscoveryDO discoveryDO) {
*/
@Override
public void removeDiscovery(final DiscoveryDO discoveryDO) {
ShenyuDiscoveryService shenyuDiscoveryService = discoveryServiceCache.remove(discoveryDO.getId());
ShenyuInstanceRegisterRepository shenyuDiscoveryService = discoveryServiceCache.remove(discoveryDO.getId());
if (shenyuDiscoveryService == null) {
return;
}
if (discoveryServiceCache.values().stream().noneMatch(p -> p.equals(shenyuDiscoveryService))) {
shenyuDiscoveryService.shutdown();
shenyuDiscoveryService.close();
LOG.info("shenyu discovery shutdown [{}] discovery", discoveryDO.getName());
}
}
Expand All @@ -123,11 +124,11 @@ public void removeDiscovery(final DiscoveryDO discoveryDO) {
*/
@Override
public void removeProxySelector(final DiscoveryHandlerDTO discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
ShenyuDiscoveryService shenyuDiscoveryService = discoveryServiceCache.get(discoveryHandlerDTO.getDiscoveryId());
ShenyuInstanceRegisterRepository shenyuDiscoveryService = discoveryServiceCache.get(discoveryHandlerDTO.getDiscoveryId());
String key = buildProxySelectorKey(discoveryHandlerDTO.getListenerNode());
Optional.ofNullable(dataChangedEventListenerCache.get(discoveryHandlerDTO.getDiscoveryId())).ifPresent(cacheKey -> {
cacheKey.remove(key);
shenyuDiscoveryService.unwatch(key);
shenyuDiscoveryService.unWatchInstances(key);
DataChangedEvent dataChangedEvent = new DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.DELETE,
Collections.singletonList(DiscoveryTransfer.INSTANCE.mapToData(proxySelectorDTO)));
eventPublisher.publishEvent(dataChangedEvent);
Expand All @@ -151,10 +152,21 @@ public void changeUpstream(final ProxySelectorDTO proxySelectorDTO, final List<D
public void fetchAll(final DiscoveryHandlerDTO discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
String discoveryId = discoveryHandlerDTO.getDiscoveryId();
if (discoveryServiceCache.containsKey(discoveryId)) {
ShenyuDiscoveryService shenyuDiscoveryService = discoveryServiceCache.get(discoveryId);
List<String> childData = shenyuDiscoveryService.getRegisterData(buildProxySelectorKey(discoveryHandlerDTO.getListenerNode()));
List<DiscoveryUpstreamData> discoveryUpstreamDataList = childData.stream().map(s -> GsonUtils.getGson().fromJson(s, DiscoveryUpstreamData.class))
.collect(Collectors.toList());
ShenyuInstanceRegisterRepository shenyuDiscoveryService = discoveryServiceCache.get(discoveryId);
final List<InstanceEntity> instanceEntities = shenyuDiscoveryService.selectInstances(buildProxySelectorKey(discoveryHandlerDTO.getListenerNode()));
List<DiscoveryUpstreamData> discoveryUpstreamDataList = instanceEntities.stream().map(instanceEntity -> {
final DiscoveryUpstreamData discoveryUpstreamData = new DiscoveryUpstreamData();
String uri = String.format("%s:%s", instanceEntity.getHost(), instanceEntity.getPort());
discoveryUpstreamData.setUrl(uri);
discoveryUpstreamData.setWeight(instanceEntity.getWeight());
discoveryUpstreamData.setStatus(instanceEntity.getStatus());
discoveryUpstreamData.setProtocol("http://");
if (discoveryUpstreamData.getNamespaceId() == null) {
discoveryUpstreamData.setNamespaceId(proxySelectorDTO.getNamespaceId());
}
discoveryUpstreamData.setDiscoveryHandlerId(proxySelectorDTO.getId());
return discoveryUpstreamData;
}).collect(Collectors.toList());
Set<String> urlList = discoveryUpstreamDataList.stream().map(DiscoveryUpstreamData::getUrl).collect(Collectors.toSet());
List<DiscoveryUpstreamDO> discoveryUpstreamDOS = discoveryUpstreamMapper.selectByDiscoveryHandlerId(discoveryHandlerDTO.getId());
Set<String> dbUrlList = discoveryUpstreamDOS.stream().map(DiscoveryUpstreamDO::getUrl).collect(Collectors.toSet());
Expand Down Expand Up @@ -227,7 +239,7 @@ public void setApplicationEventPublisher(final ApplicationEventPublisher eventPu
* @param discoveryId discoveryId
* @return ShenyuDiscoveryService
*/
public ShenyuDiscoveryService getShenyuDiscoveryService(final String discoveryId) {
public ShenyuInstanceRegisterRepository getShenyuDiscoveryService(final String discoveryId) {
return discoveryServiceCache.get(discoveryId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.shenyu.admin.discovery;

import org.apache.shenyu.admin.discovery.listener.DataChangedEventListener;
import org.apache.shenyu.admin.discovery.listener.DiscoveryDataChangedEvent;
import org.apache.shenyu.admin.exception.ShenyuAdminException;
import org.apache.shenyu.admin.listener.DataChangedEvent;
import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
Expand All @@ -25,7 +27,8 @@
import org.apache.shenyu.admin.transfer.DiscoveryTransfer;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
import org.apache.shenyu.registry.api.ShenyuInstanceRegisterRepository;
import org.apache.shenyu.registry.api.event.ChangedEventListener;

import java.util.Collections;
import java.util.Objects;
Expand All @@ -47,20 +50,28 @@ public DefaultDiscoveryProcessor(final DiscoveryUpstreamMapper discoveryUpstream

@Override
public void createProxySelector(final DiscoveryHandlerDTO discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
ShenyuDiscoveryService shenyuDiscoveryService = getShenyuDiscoveryService(discoveryHandlerDTO.getDiscoveryId());
ShenyuInstanceRegisterRepository shenyuInstanceRegisterRepository = getShenyuDiscoveryService(discoveryHandlerDTO.getDiscoveryId());
String key = buildProxySelectorKey(discoveryHandlerDTO.getListenerNode());
if (Objects.isNull(shenyuDiscoveryService)) {
if (Objects.isNull(shenyuInstanceRegisterRepository)) {
throw new ShenyuAdminException(String.format("before start ProxySelector you need init DiscoveryId=%s", discoveryHandlerDTO.getDiscoveryId()));
}
if (!shenyuDiscoveryService.exists(key)) {
throw new ShenyuAdminException(String.format("shenyu discovery start watcher need you has this key %s in Discovery", key));
}
Set<String> cacheKey = getCacheKey(discoveryHandlerDTO.getDiscoveryId());
if (Objects.nonNull(cacheKey) && cacheKey.contains(key)) {
LOG.info("shenyu discovery has watcher key = {}", key);
return;
}
shenyuDiscoveryService.watch(key, getDiscoveryDataChangedEventListener(discoveryHandlerDTO, proxySelectorDTO));
final DataChangedEventListener discoveryDataChangedEventListener = getDiscoveryDataChangedEventListener(discoveryHandlerDTO, proxySelectorDTO);
shenyuInstanceRegisterRepository.watchInstances(key, (selectKey, selectValue, event) -> {
if (event.equals(ChangedEventListener.Event.ADDED)) {
discoveryDataChangedEventListener.onChange(new DiscoveryDataChangedEvent(selectKey, selectValue, DiscoveryDataChangedEvent.Event.ADDED));
} else if (event.equals(ChangedEventListener.Event.UPDATED)) {
discoveryDataChangedEventListener.onChange(new DiscoveryDataChangedEvent(selectKey, selectValue, DiscoveryDataChangedEvent.Event.UPDATED));
} else if (event.equals(ChangedEventListener.Event.DELETED)) {
discoveryDataChangedEventListener.onChange(new DiscoveryDataChangedEvent(selectKey, selectValue, DiscoveryDataChangedEvent.Event.DELETED));
} else {
discoveryDataChangedEventListener.onChange(new DiscoveryDataChangedEvent(selectKey, selectValue, DiscoveryDataChangedEvent.Event.IGNORED));
}
});
cacheKey.add(key);
DataChangedEvent dataChangedEvent = new DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.CREATE,
Collections.singletonList(DiscoveryTransfer.INSTANCE.mapToData(proxySelectorDTO)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.admin.discovery.listener.DataChangedEventListener;
import org.apache.shenyu.admin.discovery.listener.DiscoveryDataChangedEvent;
import org.apache.shenyu.admin.discovery.parse.KeyValueParser;
import org.apache.shenyu.admin.listener.DataChangedEvent;
import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
Expand All @@ -29,8 +31,6 @@
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.utils.UUIDUtils;
import org.apache.shenyu.discovery.api.listener.DiscoveryDataChangedEvent;
import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
* limitations under the License.
*/

package org.apache.shenyu.discovery.api.listener;
package org.apache.shenyu.admin.discovery.listener;

/**
* Data changed listener.
*/
public interface DataChangedEventListener {

/**
* when data changed, fire this event.
*
*
* @param event data changed event
*/
void onChange(DiscoveryDataChangedEvent event);
}
}
Loading

0 comments on commit 8b8466e

Please sign in to comment.