From ec0441e9d900fa271bd2f9da60aae1019dc4ea40 Mon Sep 17 00:00:00 2001 From: lipanpan03 <41904587+lipanpan03@users.noreply.github.com> Date: Thu, 6 Jun 2024 19:46:31 +0800 Subject: [PATCH] Add call cypher result interface (#16) * add cypher result interface * add cypher result interface * add cypher result interface * add cypher result interface * fix cypher result * fix cypher result * fix cypher result * modify version * modify header * modify header --- pom.xml | 2 +- .../tugraph/TuGraphDbRpcClientTest.java | 7 +- .../tugraph/TuGraphDbRpcClientUtil.java | 4 +- .../antgroup/tugraph/TuGraphDbRpcClient.java | 164 ++++++++++++++---- 4 files changed, 138 insertions(+), 39 deletions(-) diff --git a/pom.xml b/pom.xml index af0d766..65c1757 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ - 1.4.1 + 1.4.2 UTF-8 4.1.44.Final 1.18.28 diff --git a/rpc-client-test/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClientTest.java b/rpc-client-test/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClientTest.java index 45f4f67..9dfc8db 100644 --- a/rpc-client-test/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClientTest.java +++ b/rpc-client-test/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClientTest.java @@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; public class TuGraphDbRpcClientTest { static Logger log = LoggerFactory.getLogger(TuGraphDbRpcClientTest.class); @@ -15,11 +14,13 @@ public static void deleteProcedure(TuGraphDbRpcClient client) { log.info("----------------testDeleteProcedure--------------------"); try { boolean result = client.deleteProcedure("CPP", "sortstr", "default"); - log.info("deleteProcedure : " + result); + log.info("deleteProcedure sortstr : " + result); assert (result); // should throw TuGraphDbRpcException result = client.deleteProcedure("CPP", "scan_graph", "default"); - log.info("loadProcedure : " + result); + log.info("deleteProcedure scan_graph : " + result); + result = client.deleteProcedure("CPP", "multi_file", "default"); + log.info("deleteProcedure multi_file : " + result); } catch (Exception e) { log.info("catch Exception : " + e.getMessage()); } diff --git a/rpc-client-test/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClientUtil.java b/rpc-client-test/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClientUtil.java index c04db59..1f48da5 100644 --- a/rpc-client-test/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClientUtil.java +++ b/rpc-client-test/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClientUtil.java @@ -126,6 +126,8 @@ static void importDataFromContent(Logger log, TuGraphDbRpcClient client, boolean JSONObject jsonObject = jsonArray.getJSONObject(0); assert (jsonObject.containsKey("COUNT(n)")); assert (jsonObject.getIntValue("COUNT(n)") == 13); + res = client.callCypher("MATCH (n) RETURN n,n.name LIMIT 3", "default", 10, true); + log.info("MATCH (n) RETURN n,n.name LIMIT 3 : " + res); } static void importSchemaFromFile(Logger log, TuGraphDbRpcClient client, boolean isHa) throws Exception { @@ -190,7 +192,7 @@ static void importDataFromFile(Logger log, TuGraphDbRpcClient client, boolean is assert (jsonObject.containsKey("COUNT(n)")); assert (jsonObject.getIntValue("COUNT(n)") == 13); - res = client.callCypher("match(n) -[r]->(m) return count(r)", "default", 1000); + res = client.callCypher("match (n)-[r]->(m) return count(r)", "default", 1000); log.info("match(n) -[r]->(m) return count(r) : " + res); jsonArray = JSONArray.parseArray(res); assert (jsonArray.size() == 1); diff --git a/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java b/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java index 01bcc0f..1c711c4 100644 --- a/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java +++ b/rpc-client/src/main/java/com/antgroup/tugraph/TuGraphDbRpcClient.java @@ -11,6 +11,7 @@ import com.baidu.brpc.client.loadbalance.LoadBalanceStrategy; import com.baidu.brpc.protocol.Options; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import lgraph.Lgraph; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -19,7 +20,6 @@ import java.io.BufferedInputStream; import java.io.File; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.*; @@ -82,18 +82,26 @@ public TuGraphDbRpcClient(List urls, String user, String password) throw } public String callCypher(String cypher, String graph, double timeout) throws Exception { + return callCypher(cypher, graph, timeout, false); + } + + public String callCypher(String cypher, String graph, double timeout, boolean withHeader) throws Exception { if (clientType == ClientType.SINGLE_CONNECTION){ - return baseClient.callCypher(cypher, graph, timeout); + return baseClient.callCypher(cypher, graph, timeout, withHeader); } else { - return doubleCheckQuery(()-> getClient(Lgraph.ProtoGraphQueryType.CYPHER, cypher, graph).callCypher(cypher, graph, timeout)); + return doubleCheckQuery(()-> getClient(Lgraph.ProtoGraphQueryType.CYPHER, cypher, graph).callCypher(cypher, graph, timeout, withHeader)); } } public String callGql(String gql, String graph, double timeout) throws Exception { + return callGql(gql, graph, timeout, false); + } + + public String callGql(String gql, String graph, double timeout, boolean withHeader) throws Exception { if (clientType == ClientType.SINGLE_CONNECTION){ - return baseClient.callGql(gql, graph, timeout); + return baseClient.callGql(gql, graph, timeout, withHeader); } else { - return doubleCheckQuery(()-> getClient(Lgraph.ProtoGraphQueryType.GQL, gql, graph).callGql(gql, graph, timeout)); + return doubleCheckQuery(()-> getClient(Lgraph.ProtoGraphQueryType.GQL, gql, graph).callGql(gql, graph, timeout, withHeader)); } } @@ -101,10 +109,18 @@ public String callCypher(String cypher, String graph, double timeout, String url return doubleCheckQuery(()-> getClientByNode(url).callCypher(cypher, graph, timeout)); } + public String callCypher(String cypher, String graph, double timeout, String url, boolean withHeader) throws Exception { + return doubleCheckQuery(()-> getClientByNode(url).callCypher(cypher, graph, timeout, withHeader)); + } + public String callGql(String gql, String graph, double timeout, String url) throws Exception { return doubleCheckQuery(()-> getClientByNode(url).callGql(gql, graph, timeout)); } + public String callGql(String gql, String graph, double timeout, String url, boolean withHeader) throws Exception { + return doubleCheckQuery(()-> getClientByNode(url).callGql(gql, graph, timeout, withHeader)); + } + public String callCypherToLeader(String cypher, String graph, double timeout) throws Exception { if (clientType == ClientType.SINGLE_CONNECTION){ return baseClient.callCypher(cypher, graph, timeout); @@ -113,6 +129,14 @@ public String callCypherToLeader(String cypher, String graph, double timeout) th } } + public String callCypherToLeader(String cypher, String graph, double timeout, boolean withHeader) throws Exception { + if (clientType == ClientType.SINGLE_CONNECTION){ + return baseClient.callCypher(cypher, graph, timeout, withHeader); + } else { + return doubleCheckQuery(()-> leaderClient.callCypher(cypher, graph, timeout, withHeader)); + } + } + public String callGqlToLeader(String gql, String graph, double timeout) throws Exception { if (clientType == ClientType.SINGLE_CONNECTION){ return baseClient.callGql(gql, graph, timeout); @@ -121,15 +145,23 @@ public String callGqlToLeader(String gql, String graph, double timeout) throws E } } + public String callGqlToLeader(String gql, String graph, double timeout, boolean withHeader) throws Exception { + if (clientType == ClientType.SINGLE_CONNECTION){ + return baseClient.callGql(gql, graph, timeout, withHeader); + } else { + return doubleCheckQuery(()-> leaderClient.callGql(gql, graph, timeout, withHeader)); + } + } + public String callProcedure(String procedureType, String procedureName, String param, double procedureTimeOut, boolean inProcess, String graph) throws Exception { return callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, false); } public String callProcedure(String procedureType, String procedureName, String param, double procedureTimeOut, - boolean inProcess, String graph, boolean jsonFormat) throws Exception { + boolean inProcess, String graph, boolean withHeader) throws Exception { if (clientType == ClientType.SINGLE_CONNECTION) { - return baseClient.callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, jsonFormat); + return baseClient.callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, withHeader); } else { return doubleCheckQuery(()-> { boolean readOnly = false; @@ -140,7 +172,7 @@ public String callProcedure(String procedureType, String procedureName, String p } } return getClient(readOnly) - .callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, jsonFormat); + .callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, withHeader); }); } } @@ -151,8 +183,8 @@ public String callProcedure(String procedureType, String procedureName, String p } public String callProcedure(String procedureType, String procedureName, String param, double procedureTimeOut, - boolean inProcess, String graph, boolean jsonFormat, String url) throws Exception { - return doubleCheckQuery(()-> getClientByNode(url).callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, jsonFormat)); + boolean inProcess, String graph, boolean withHeader, String url) throws Exception { + return doubleCheckQuery(()-> getClientByNode(url).callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, withHeader)); } public String callProcedureToLeader(String procedureType, String procedureName, String param, double procedureTimeOut, @@ -161,11 +193,11 @@ public String callProcedureToLeader(String procedureType, String procedureName, } public String callProcedureToLeader(String procedureType, String procedureName, String param, double procedureTimeOut, - boolean inProcess, String graph, boolean jsonFormat) throws Exception { + boolean inProcess, String graph, boolean withHeader) throws Exception { if (clientType == ClientType.SINGLE_CONNECTION) { - return baseClient.callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, jsonFormat); + return baseClient.callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, withHeader); } else { - return doubleCheckQuery(()-> leaderClient.callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, jsonFormat)); + return doubleCheckQuery(()-> leaderClient.callProcedure(procedureType, procedureName, param, procedureTimeOut, inProcess, graph, withHeader)); } } @@ -443,9 +475,9 @@ public TuGraphSingleRpcClient(String url, String user, String pass) { this.url = url; } - private String handleGraphQueryRequest(Lgraph.ProtoGraphQueryType type, String query, String graph, double timeout) { + private String handleGraphQueryRequest(Lgraph.ProtoGraphQueryType type, String query, String graph, double timeout, boolean withHeader) { Lgraph.GraphQueryRequest queryRequest = - Lgraph.GraphQueryRequest.newBuilder().setType(type).setQuery(query).setResultInJsonFormat(true) + Lgraph.GraphQueryRequest.newBuilder().setType(type).setQuery(query).setResultInJsonFormat(!withHeader) .setGraph(graph).setTimeout(timeout).build(); Lgraph.LGraphRequest request = Lgraph.LGraphRequest.newBuilder().setGraphQueryRequest(queryRequest).setToken(this.token) @@ -455,15 +487,71 @@ private String handleGraphQueryRequest(Lgraph.ProtoGraphQueryType type, String q throw new TuGraphDbRpcException(response.getErrorCode(), response.getError(), "handleGraphQueryRequest"); } serverVersion = Math.max(response.getServerVersion(), serverVersion); - return response.getGraphQueryResponse().getJsonResult(); + if (!withHeader){ + return response.getGraphQueryResponse().getJsonResult(); + } else { + Lgraph.GraphQueryResult graphQueryResult = response.getGraphQueryResponse().getBinaryResult(); + JSONObject ans = new JSONObject(); + JSONArray header = new JSONArray(), result = new JSONArray(); + for (Lgraph.Header headerKV : graphQueryResult.getHeaderList()) { + JSONObject headerItem = new JSONObject(); + headerItem.put("name", headerKV.getName()); + headerItem.put("type", headerKV.getType()); + header.add(headerItem); + } + ans.put("header", header); + for (Lgraph.ListOfProtoFieldData resultData : graphQueryResult.getResultList()) { + JSONArray resultItem = new JSONArray(); + for (Lgraph.ProtoFieldData fieldData : resultData.getValuesList()) { + switch (fieldData.getDataCase()) { + case BOOLEAN: + resultItem.add(fieldData.getBoolean()); + break; + case INT8_: + resultItem.add(fieldData.getInt8()); + break; + case INT16_: + resultItem.add(fieldData.getInt16()); + break; + case INT32_: + resultItem.add(fieldData.getInt32()); + break; + case INT64_: + resultItem.add(fieldData.getInt64()); + break; + case SP: + resultItem.add(fieldData.getSp()); + break; + case DP: + resultItem.add(fieldData.getDp()); + break; + case DATE: + resultItem.add(fieldData.getDate()); + break; + case DATETIME: + resultItem.add(fieldData.getDatetime()); + break; + case STR: + resultItem.add(fieldData.getStr()); + break; + case BLOB: + resultItem.add(fieldData.getBlob()); + break; + } + } + result.add(resultItem); + } + ans.put("result", result); + return ans.toString(); + } } - private String handleCypherRequest(String query, String graph, double timeout) { - return handleGraphQueryRequest(Lgraph.ProtoGraphQueryType.CYPHER, query, graph, timeout); + private String handleCypherRequest(String query, String graph, double timeout, boolean withHeader) { + return handleGraphQueryRequest(Lgraph.ProtoGraphQueryType.CYPHER, query, graph, timeout, withHeader); } - private String handleGqlRequest(String query, String graph, double timeout) { - return handleGraphQueryRequest(Lgraph.ProtoGraphQueryType.GQL, query, graph, timeout); + private String handleGqlRequest(String query, String graph, double timeout, boolean withHeader) { + return handleGraphQueryRequest(Lgraph.ProtoGraphQueryType.GQL, query, graph, timeout, withHeader); } // parse delimiter and process strings like \r \n \002 \0xA @@ -659,12 +747,20 @@ public String getUrl() { return url; } + public String callCypher(String cypher, String graph, double timeout, boolean withHeader) { + return handleCypherRequest(cypher, graph, timeout, withHeader); + } + public String callCypher(String cypher, String graph, double timeout) { - return handleCypherRequest(cypher, graph, timeout); + return handleCypherRequest(cypher, graph, timeout, false); + } + + public String callGql(String gql, String graph, double timeout, boolean withHeader) { + return handleGqlRequest(gql, graph, timeout, withHeader); } public String callGql(String gql, String graph, double timeout) { - return handleGqlRequest(gql, graph, timeout); + return handleGqlRequest(gql, graph, timeout, false); } public String callProcedure(String procedureType, String procedureName, String param, double procedureTimeOut, @@ -676,18 +772,18 @@ public String callProcedure(String procedureType, String procedureName, String p } public String callProcedure(String procedureType, String procedureName, String param, double procedureTimeOut, - boolean inProcess, String graph, boolean jsonFormat) { + boolean inProcess, String graph, boolean withHeader) { Lgraph.PluginRequest.PluginType type = "CPP".equals(procedureType) ? Lgraph.PluginRequest.PluginType.CPP : Lgraph.PluginRequest.PluginType.PYTHON; - ByteString resp = callProcedure(type, procedureName, ByteString.copyFromUtf8(param), graph, procedureTimeOut, inProcess, jsonFormat); + ByteString resp = callProcedure(type, procedureName, ByteString.copyFromUtf8(param), graph, procedureTimeOut, inProcess, withHeader); return resp.toStringUtf8(); } public ByteString callProcedure(Lgraph.PluginRequest.PluginType type, String name, ByteString param, - String graph, double timeout, boolean inProcess, boolean jsonFormat) { + String graph, double timeout, boolean inProcess, boolean withHeader) { Lgraph.CallPluginRequest vreq = Lgraph.CallPluginRequest.newBuilder().setName(name).setParam(param).setTimeout(timeout) - .setInProcess(inProcess).setResultInJsonFormat(jsonFormat).build(); + .setInProcess(inProcess).setResultInJsonFormat(withHeader).build(); Lgraph.PluginRequest req = Lgraph.PluginRequest.newBuilder().setType(type).setCallPluginRequest(vreq).setGraph(graph).build(); Lgraph.LGraphRequest request = @@ -696,7 +792,7 @@ public ByteString callProcedure(Lgraph.PluginRequest.PluginType type, String nam if (response.getErrorCode().getNumber() != Lgraph.LGraphResponse.ErrorCode.SUCCESS_VALUE) { throw new TuGraphDbRpcException(response.getErrorCode(), response.getError(), "CallProcedure"); } - if (jsonFormat) { + if (withHeader) { return response.getPluginResponse().getCallPluginResponse().getJsonResultBytes(); } else { return response.getPluginResponse().getCallPluginResponse().getReply(); @@ -779,7 +875,7 @@ public boolean deleteProcedure(String procedureType, return true; } - public boolean importSchemaFromContent(String schema, String graph, double timeout) throws InputException{ + public boolean importSchemaFromContent(String schema, String graph, double timeout) throws InputException, InvalidProtocolBufferException { byte[] textByte = schema.getBytes(StandardCharsets.UTF_8); String schema64 = Base64.getEncoder().encodeToString(textByte); String sb = "CALL db.importor.schemaImportor('" @@ -787,14 +883,14 @@ public boolean importSchemaFromContent(String schema, String graph, double timeo + "')"; String res = callCypher(sb, graph, timeout); // this built-in procedure always returns "[]" for null. - if (JSONArray.parseArray(res).size() != 0) { + if (!JSONArray.parseArray(res).isEmpty()) { throw new InputException(res); } return true; } public boolean importDataFromContent(String desc, String data, String delimiter, - boolean continueOnError, int threadNums, String graph, double timeout) throws UnsupportedEncodingException { + boolean continueOnError, int threadNums, String graph, double timeout) { byte[] textByteDesc = desc.getBytes(StandardCharsets.UTF_8); byte[] textByteData = data.getBytes(StandardCharsets.UTF_8); String desc64 = Base64.getEncoder().encodeToString(textByteDesc); @@ -812,7 +908,7 @@ public boolean importDataFromContent(String desc, String data, String delimiter, + "')"; String res = callCypher(sb, graph, timeout); // this built-in procedure always returns "[]" for null. - if (JSONArray.parseArray(res).size() != 0) { + if (!JSONArray.parseArray(res).isEmpty()) { throw new InputException(res); } return true; @@ -836,7 +932,7 @@ public boolean importSchemaFromFile(String schemaFile, String graph, double time + "')"; String res = callCypher(sb, graph, timeout); // this built-in procedure always returns "[]" for null. - if (JSONArray.parseArray(res).size() != 0) { + if (!JSONArray.parseArray(res).isEmpty()) { throw new InputException(res); } return true; @@ -892,11 +988,11 @@ public boolean importDataFromFile(String confFile, String delimiter, boolean con + parseDelimiter(delimiter) + "')"; String res = callCypher(sb, graph, timeout); - if (JSONArray.parseArray(res).size() != 0) { + if (!JSONArray.parseArray(res).isEmpty()) { throw new InputException(res); } // this built-in procedure always returns "[]" for null. - if (JSONArray.parseArray(res).size() != 0) { + if (!JSONArray.parseArray(res).isEmpty()) { throw new InputException(res); } buf = cutter.cut();