Skip to content

Commit c7f98c1

Browse files
authored
fix TuGraphDbRpcClient (#24)
* fix TuGraphDbRpcClient * fix TuGraphDbRpcClient
1 parent 4e64dc4 commit c7f98c1

File tree

1 file changed

+23
-6
lines changed

1 file changed

+23
-6
lines changed

rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.google.protobuf.InvalidProtocolBufferException;
1515
import lgraph.Lgraph;
1616
import lombok.extern.slf4j.Slf4j;
17+
import org.apache.commons.collections4.CollectionUtils;
1718
import org.apache.commons.io.FileUtils;
1819
import org.apache.commons.lang3.StringUtils;
1920

@@ -24,6 +25,7 @@
2425
import java.nio.file.Files;
2526
import java.util.*;
2627
import java.util.concurrent.CopyOnWriteArrayList;
28+
import java.util.concurrent.CopyOnWriteArraySet;
2729

2830
/**
2931
* @Author: [email protected]
@@ -53,6 +55,7 @@ enum ClientType{
5355
// Attributes common to all types of clients
5456
private TuGraphSingleRpcClient leaderClient;
5557
private final List<TuGraphSingleRpcClient> rpcClientPool = new CopyOnWriteArrayList<>();
58+
private final Set<String> failUrls = new CopyOnWriteArraySet<>();
5659
private List<UserDefinedProcedure> userDefinedProcedures = new CopyOnWriteArrayList<>();
5760
private List<BuiltInProcedure> builtInProcedures = new CopyOnWriteArrayList<>();
5861

@@ -154,7 +157,7 @@ public String callGqlToLeader(String gql, String graph, double timeout, boolean
154157
}
155158

156159
public String callProcedure(String procedureType, String procedureName, String param, double procedureTimeOut,
157-
boolean inProcess, String graph) throws Exception {
160+
boolean inProcess, String graph) throws Exception {
158161
return callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, false);
159162
}
160163

@@ -188,7 +191,7 @@ public String callProcedure(String procedureType, String procedureName, String p
188191
}
189192

190193
public String callProcedureToLeader(String procedureType, String procedureName, String param, double procedureTimeOut,
191-
boolean inProcess, String graph) throws Exception {
194+
boolean inProcess, String graph) throws Exception {
192195
return callProcedureToLeader(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, false);
193196
}
194197

@@ -202,7 +205,7 @@ public String callProcedureToLeader(String procedureType, String procedureName,
202205
}
203206

204207
public boolean loadProcedure(String sourceFile, String procedureType, String procedureName, String codeType,
205-
String procedureDescription, boolean readOnly, String version, String graph) throws Exception {
208+
String procedureDescription, boolean readOnly, String version, String graph) throws Exception {
206209
if (clientType == ClientType.SINGLE_CONNECTION) {
207210
return baseClient.loadProcedure(sourceFile, procedureType, procedureName, codeType, procedureDescription, readOnly, version, graph);
208211
} else {
@@ -356,7 +359,10 @@ private TuGraphSingleRpcClient getClient(Lgraph.ProtoGraphQueryType type, String
356359

357360
private TuGraphSingleRpcClient getClient(boolean isReadQuery) throws Exception {
358361
if (isReadQuery) {
359-
if (rpcClientPool.size() == 0){
362+
if (CollectionUtils.isNotEmpty(failUrls)) {
363+
loadRpcClient(failUrls);
364+
}
365+
if (CollectionUtils.isEmpty(rpcClientPool)) {
360366
throw new Exception("all instance is down, refuse req!");
361367
}
362368
TuGraphSingleRpcClient rpcClient = rpcClientPool.get(rpcClientPool.size() - 1);
@@ -395,16 +401,27 @@ private void refreshClientPool() {
395401
}
396402
});
397403
} else {
398-
for (String url : urls) {
404+
loadRpcClient(new HashSet<>(urls));
405+
}
406+
}
407+
408+
private void loadRpcClient(Set<String> urlList) {
409+
Set<String> failList = new HashSet<>();
410+
for (String url: urlList) {
411+
try {
399412
TuGraphSingleRpcClient rpcClient = new TuGraphSingleRpcClient("list://" + url, user, password);
400413
String result = rpcClient.callCypher("CALL dbms.ha.clusterInfo()", "default", 10);
401414
ClusterInfo clusterInfo = JSON.parseObject(JSON.parseArray(result).get(0).toString(), new TypeReference<ClusterInfo>(){});
402415
if (clusterInfo.isMaster()) {
403416
leaderClient = rpcClient;
404417
}
405418
rpcClientPool.add(rpcClient);
419+
} catch (Exception e) {
420+
failList.add(url);
406421
}
407422
}
423+
failUrls.clear();
424+
failUrls.addAll(failList);
408425
}
409426

410427
/**
@@ -438,9 +455,9 @@ private <E> E doubleCheckQuery(QueryInterface<E> queryInterface) throws Exceptio
438455
return queryInterface.method();
439456
} catch (Exception e2) {
440457
log.error(e2.getMessage());
441-
throw e2;
442458
}
443459
}
460+
return null;
444461
}
445462

446463
private static class TuGraphSingleRpcClient {

0 commit comments

Comments
 (0)