From 023ae24422bd46de8a960b718dd75dc8aa18deda Mon Sep 17 00:00:00 2001 From: juipengchang Date: Wed, 15 Jan 2025 15:19:42 +0800 Subject: [PATCH 1/2] fix TuGraphDbRpcClient --- .../antgroup/tugraph/TuGraphDbRpcClient.java | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java b/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java index 1c711c4..864da41 100644 --- a/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java +++ b/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java @@ -14,6 +14,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import lgraph.Lgraph; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; @@ -24,6 +25,7 @@ import java.nio.file.Files; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; /** * @Author: haoyongdong.hyd@antgroup.com @@ -53,6 +55,7 @@ enum ClientType{ // Attributes common to all types of clients private TuGraphSingleRpcClient leaderClient; private final List rpcClientPool = new CopyOnWriteArrayList<>(); + private final Set failUrls = new CopyOnWriteArraySet<>(); private List userDefinedProcedures = new CopyOnWriteArrayList<>(); private List builtInProcedures = new CopyOnWriteArrayList<>(); @@ -154,7 +157,7 @@ public String callGqlToLeader(String gql, String graph, double timeout, boolean } public String callProcedure(String procedureType, String procedureName, String param, double procedureTimeOut, - boolean inProcess, String graph) throws Exception { + boolean inProcess, String graph) throws Exception { return callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, false); } @@ -188,7 +191,7 @@ public String callProcedure(String procedureType, String procedureName, String p } public String callProcedureToLeader(String procedureType, String procedureName, String param, double procedureTimeOut, - boolean inProcess, String graph) throws Exception { + boolean inProcess, String graph) throws Exception { return callProcedureToLeader(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, false); } @@ -202,7 +205,7 @@ public String callProcedureToLeader(String procedureType, String procedureName, } public boolean loadProcedure(String sourceFile, String procedureType, String procedureName, String codeType, - String procedureDescription, boolean readOnly, String version, String graph) throws Exception { + String procedureDescription, boolean readOnly, String version, String graph) throws Exception { if (clientType == ClientType.SINGLE_CONNECTION) { return baseClient.loadProcedure(sourceFile, procedureType, procedureName, codeType, procedureDescription, readOnly, version, graph); } else { @@ -356,9 +359,12 @@ private TuGraphSingleRpcClient getClient(Lgraph.ProtoGraphQueryType type, String private TuGraphSingleRpcClient getClient(boolean isReadQuery) throws Exception { if (isReadQuery) { - if (rpcClientPool.size() == 0){ + if (CollectionUtils.isEmpty(rpcClientPool)) { throw new Exception("all instance is down, refuse req!"); } + if (CollectionUtils.isNotEmpty(failUrls)) { + loadRpcClient(failUrls); + } TuGraphSingleRpcClient rpcClient = rpcClientPool.get(rpcClientPool.size() - 1); loadBalance(); return rpcClient; @@ -395,7 +401,14 @@ private void refreshClientPool() { } }); } else { - for (String url : urls) { + loadRpcClient(new HashSet<>(urls)); + } + } + + private void loadRpcClient(Set urlList) { + Set failList = new HashSet<>(); + for (String url: urlList) { + try { TuGraphSingleRpcClient rpcClient = new TuGraphSingleRpcClient("list://" + url, user, password); String result = rpcClient.callCypher("CALL dbms.ha.clusterInfo()", "default", 10); ClusterInfo clusterInfo = JSON.parseObject(JSON.parseArray(result).get(0).toString(), new TypeReference(){}); @@ -403,8 +416,12 @@ private void refreshClientPool() { leaderClient = rpcClient; } rpcClientPool.add(rpcClient); + } catch (Exception e) { + failList.add(url); } } + failUrls.clear(); + failUrls.addAll(failList); } /** @@ -438,9 +455,9 @@ private E doubleCheckQuery(QueryInterface queryInterface) throws Exceptio return queryInterface.method(); } catch (Exception e2) { log.error(e2.getMessage()); - throw e2; } } + return null; } private static class TuGraphSingleRpcClient { From 61bb1a6d758ea63aca386f1928adbadc7103c320 Mon Sep 17 00:00:00 2001 From: juipengchang Date: Sun, 19 Jan 2025 18:54:06 +0800 Subject: [PATCH 2/2] fix TuGraphDbRpcClient --- .../main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java b/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java index 864da41..ccf7695 100644 --- a/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java +++ b/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java @@ -359,12 +359,12 @@ private TuGraphSingleRpcClient getClient(Lgraph.ProtoGraphQueryType type, String private TuGraphSingleRpcClient getClient(boolean isReadQuery) throws Exception { if (isReadQuery) { - if (CollectionUtils.isEmpty(rpcClientPool)) { - throw new Exception("all instance is down, refuse req!"); - } if (CollectionUtils.isNotEmpty(failUrls)) { loadRpcClient(failUrls); } + if (CollectionUtils.isEmpty(rpcClientPool)) { + throw new Exception("all instance is down, refuse req!"); + } TuGraphSingleRpcClient rpcClient = rpcClientPool.get(rpcClientPool.size() - 1); loadBalance(); return rpcClient;