Skip to content

Commit

Permalink
Fix tars register issue (#1211)
Browse files Browse the repository at this point in the history
* feature: add handlerSelector in TarsPluginDataHandler.

* feature: Cache context for upstreamList.

* fix: metaData.getContextPath() is NPE.

* fix: updte contextPath.

Co-authored-by: dengliming <[email protected]>
Co-authored-by: chenlu.666 <[email protected]>
  • Loading branch information
3 people authored Apr 2, 2021
1 parent 6d9a9bf commit 57d868f
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public synchronized String registerTars(final MetaDataRegisterDTO dto) {
}
final MetaDataDO exist = metaDataMapper.findByServiceNameAndMethod(dto.getServiceName(), dto.getMethodName());
saveOrUpdateMetaData(exist, dto);
String selectorId = handlerTarsSelector(dto);
String selectorId = handlerSelector(dto);
handlerTarsRule(selectorId, dto, exist);
return SoulResultMessage.SUCCESS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;

Expand Down Expand Up @@ -51,4 +52,15 @@ public class MetaData implements Serializable {
private String rpcExt;

private Boolean enabled;

/**
* update ContextPath.
*
* @author HoldDie
*/
public void updateContextPath() {
if (StringUtils.isNoneBlank(this.path)) {
this.contextPath = this.path.substring(0, StringUtils.indexOf(path, "/", 1));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.assertj.core.internal.bytebuddy.dynamic.DynamicType;
import org.assertj.core.internal.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import org.dromara.soul.common.dto.MetaData;
import org.dromara.soul.common.dto.SelectorData;
import org.dromara.soul.common.dto.convert.DivideUpstream;
import org.dromara.soul.common.exception.SoulException;
import org.dromara.soul.common.utils.GsonUtils;
import org.dromara.soul.plugin.tars.proxy.TarsInvokePrx;
Expand All @@ -45,12 +47,14 @@
import org.dromara.soul.plugin.tars.util.ReturnValueResolver;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
* Tars config cache.
Expand All @@ -74,10 +78,14 @@ public TarsInvokePrxList load(final String key) {
}
});

private final ConcurrentHashMap<String, List<MetaData>> ctxPathCache = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, Class<?>> prxClassCache = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, TarsParamInfo> prxParamCache = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, List<DivideUpstream>> refreshUpstreamCache = new ConcurrentHashMap<>();

private final Communicator communicator;

private ApplicationConfigCache() {
Expand All @@ -91,7 +99,7 @@ private int getSize() {
/**
* Get reference config.
*
* @param path path
* @param path path
* @return the reference config
*/
public TarsInvokePrxList get(final String path) {
Expand All @@ -110,7 +118,7 @@ public TarsInvokePrxList get(final String path) {
@SuppressWarnings("all")
public void initPrx(final MetaData metaData) {
for (; ;) {
Class<?> prxClass = prxClassCache.get(metaData.getServiceName());
Class<?> prxClass = prxClassCache.get(metaData.getPath());
try {
if (Objects.isNull(prxClass)) {
assert LOCK != null;
Expand Down Expand Up @@ -147,24 +155,20 @@ public void initPrx(final MetaData metaData) {
.load(Servant.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
.getLoaded();
assert communicator != null;
prxClassCache.put(metaData.getServiceName(), prxClazz);
prxClassCache.put(metaData.getPath(), prxClazz);
List<MetaData> paths = ctxPathCache.getOrDefault(metaData.getContextPath(), new ArrayList<>());
if (!paths.contains(metaData.getPath())) {
paths.add(metaData);
}
ctxPathCache.put(metaData.getContextPath(), paths);
} finally {
LOCK.unlock();
}
}
} else {
// if object name is same it will return same prx
Object prx = communicator.stringToProxy(prxClass, PrxInfoUtil.getObjectName(metaData));
TarsInvokePrxList tarsInvokePrxList = cache.get(metaData.getPath());
if (tarsInvokePrxList.getMethod() == null) {
TarsParamInfo tarsParamInfo = prxParamCache.get(getClassMethodKey(prxClass.getName(), metaData.getMethodName()));
Method method = prx.getClass().getDeclaredMethod(
PrxInfoUtil.getMethodName(metaData.getMethodName()), tarsParamInfo.getParamTypes());
tarsInvokePrxList.setMethod(method);
tarsInvokePrxList.setParamTypes(tarsParamInfo.getParamTypes());
tarsInvokePrxList.setParamNames(tarsParamInfo.getParamNames());
if (Objects.nonNull(metaData.getContextPath()) && Objects.nonNull(refreshUpstreamCache.get(metaData.getContextPath()))) {
refreshTarsInvokePrxList(metaData, refreshUpstreamCache.get(metaData.getContextPath()));
}
tarsInvokePrxList.getTarsInvokePrxList().add(new TarsInvokePrx(prx, metaData.getAppName()));
break;
}
} catch (Exception e) {
Expand All @@ -177,7 +181,7 @@ public void initPrx(final MetaData metaData) {
/**
* Get param info key.
*
* @param className className
* @param className className
* @param methodName methodName
* @return the key
*/
Expand All @@ -195,6 +199,69 @@ public static ApplicationConfigCache getInstance() {
return ApplicationConfigCacheInstance.INSTANCE;
}

/**
* initPrxClass.
*
* @param selectorData selectorData
*/
public void initPrxClass(final SelectorData selectorData) {
try {
final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);
if (null == upstreamList || upstreamList.size() == 0) {
invalidate(selectorData.getName());
return;
}
refreshUpstreamCache.put(selectorData.getName(), upstreamList);
List<MetaData> metaDataList = ctxPathCache.getOrDefault(selectorData.getName(), new ArrayList<>());
for (MetaData metaData : metaDataList) {
refreshTarsInvokePrxList(metaData, upstreamList);
}
} catch (ExecutionException | NoSuchMethodException e) {
throw new SoulException(e.getCause());
}
}

/**
* refresh metaData path upstream url.
*
* @param metaData metaData
* @param upstreamList upstream list
*/
private void refreshTarsInvokePrxList(final MetaData metaData, final List<DivideUpstream> upstreamList) throws NoSuchMethodException, ExecutionException {
Class<?> prxClass = prxClassCache.get(metaData.getPath());
if (Objects.isNull(prxClass)) {
return;
}
TarsInvokePrxList tarsInvokePrxList = cache.get(metaData.getPath());
tarsInvokePrxList.getTarsInvokePrxList().clear();
if (tarsInvokePrxList.getMethod() == null) {
TarsParamInfo tarsParamInfo = prxParamCache.get(getClassMethodKey(prxClass.getName(), metaData.getMethodName()));
Object prx = communicator.stringToProxy(prxClass, PrxInfoUtil.getObjectName(upstreamList.get(0).getUpstreamUrl(), metaData.getServiceName()));
Method method = prx.getClass().getDeclaredMethod(
PrxInfoUtil.getMethodName(metaData.getMethodName()), tarsParamInfo.getParamTypes());
tarsInvokePrxList.setMethod(method);
tarsInvokePrxList.setParamTypes(tarsParamInfo.getParamTypes());
tarsInvokePrxList.setParamNames(tarsParamInfo.getParamNames());
}
tarsInvokePrxList.getTarsInvokePrxList().addAll(upstreamList.stream().map(upstream -> {
Object strProxy = communicator.stringToProxy(prxClass, PrxInfoUtil.getObjectName(upstream.getUpstreamUrl(), metaData.getServiceName()));
return new TarsInvokePrx(strProxy, upstream.getUpstreamUrl());
}).collect(Collectors.toList()));
}

/**
* invalidate.
*
* @param contextPath context path
* @author HoldDie
*/
public void invalidate(final String contextPath) {
List<MetaData> metaDataList = ctxPathCache.getOrDefault(contextPath, new ArrayList<>());
for (MetaData metaData : metaDataList) {
cache.invalidate(metaData.getPath());
}
}

/**
* The type Application config cache instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
package org.dromara.soul.plugin.tars.handler;

import org.dromara.soul.common.dto.PluginData;
import org.dromara.soul.common.dto.SelectorData;
import org.dromara.soul.common.enums.PluginEnum;
import org.dromara.soul.plugin.base.handler.PluginDataHandler;
import org.dromara.soul.plugin.tars.cache.ApplicationConfigCache;

import java.util.Objects;

/**
* The type tars plugin data handler.
Expand All @@ -36,4 +40,21 @@ public void handlerPlugin(final PluginData pluginData) {
public String pluginNamed() {
return PluginEnum.TARS.getName();
}

@Override
public void handlerSelector(final SelectorData selectorData) {
if (Objects.isNull(selectorData.getName())) {
return;
}
ApplicationConfigCache.getInstance().initPrxClass(selectorData);
}

@Override
public void removeSelector(final SelectorData selectorData) {
if (Objects.isNull(selectorData.getName())) {
return;
}
ApplicationConfigCache.getInstance().invalidate(selectorData.getName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class TarsMetaDataSubscriber implements MetaDataSubscriber {

@Override
public void onSubscribe(final MetaData metaData) {
metaData.updateContextPath();
if (RpcTypeEnum.TARS.getName().equals(metaData.getRpcType())) {
MetaData metaExist = META_DATA.get(metaData.getPath());
List<TarsInvokePrx> prxList = ApplicationConfigCache.getInstance()
Expand All @@ -57,6 +58,7 @@ public void onSubscribe(final MetaData metaData) {

@Override
public void unSubscribe(final MetaData metaData) {
metaData.updateContextPath();
if (RpcTypeEnum.TARS.getName().equals(metaData.getRpcType())) {
List<TarsInvokePrx> prxList = ApplicationConfigCache.getInstance()
.get(metaData.getPath()).getTarsInvokePrxList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,13 @@ public static String getMethodName(final String methodName) {
/**
* Get objectName to get tars proxy.
*
* @param metaData metaData
* @param upstreamUrl upstream url
* @param serviceName service name
* @return objectName
*/
public static String getObjectName(final MetaData metaData) {
String[] ipAndPort = metaData.getAppName().split(":");
return metaData.getServiceName() + "@tcp -h " + ipAndPort[0] + " -p " + ipAndPort[1];
public static String getObjectName(final String upstreamUrl, final String serviceName) {
String[] ipAndPort = upstreamUrl.split(":");
return serviceName + "@tcp -h " + ipAndPort[0] + " -p " + ipAndPort[1];
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ public void testGetMethodName() {

@Test
public void testGetObjectName() {
final MetaData metaData = new MetaData("id", "127.0.0.1:8080", "contextPath",
"path", "rpcType", "serviceName", "methodName",
"parameterTypes", "rpcExt", false);
final String result = PrxInfoUtil.getObjectName(metaData);
final String result = PrxInfoUtil.getObjectName("127.0.0.1:8080", "serviceName");
assertEquals("serviceName@tcp -h 127.0.0.1 -p 8080", result);
}

Expand Down

0 comments on commit 57d868f

Please sign in to comment.