Skip to content

Commit

Permalink
[fix](RemoteUDF) fix string type do not set PGenericType #23832
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored and xiaokang committed Sep 5, 2023
1 parent ec9c39b commit 4d66e97
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 12 deletions.
2 changes: 2 additions & 0 deletions be/src/vec/data_types/serde/data_type_string_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ Status DataTypeStringSerDe::deserialize_one_cell_from_text(IColumn& column, Slic
Status DataTypeStringSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start,
int end) const {
result.mutable_bytes_value()->Reserve(end - start);
auto ptype = result.mutable_type();
ptype->set_id(PGenericType::STRING);
for (size_t row_num = start; row_num < end; ++row_num) {
StringRef data = column.get_data_at(row_num);
result.add_string_value(data.to_string());
Expand Down
49 changes: 45 additions & 4 deletions docs/en/docs/ecosystem/udf/remote-user-defined-function.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,19 @@ Instructions:

Sample:
```sql
CREATE FUNCTION rpc_add(INT, INT) RETURNS INT PROPERTIES (
"SYMBOL"="add_int",
"OBJECT_FILE"="127.0.0.1:9090",
CREATE FUNCTION rpc_add_two(INT,INT) RETURNS INT PROPERTIES (
"SYMBOL"="add_int_two",
"OBJECT_FILE"="127.0.0.1:9114",
"TYPE"="RPC"
);
CREATE FUNCTION rpc_add_one(INT) RETURNS INT PROPERTIES (
"SYMBOL"="add_int_one",
"OBJECT_FILE"="127.0.0.1:9114",
"TYPE"="RPC"
);
CREATE FUNCTION rpc_add_string(varchar(30)) RETURNS varchar(30) PROPERTIES (
"SYMBOL"="add_string",
"OBJECT_FILE"="127.0.0.1:9114",
"TYPE"="RPC"
);
```
Expand All @@ -106,4 +116,35 @@ The use of UDF is consistent with ordinary function methods. The only difference
When you no longer need UDF functions, you can delete a UDF function by the following command, you can refer to `DROP FUNCTION`.

## Example
Examples of rpc server implementations and cpp/java/python languages are provided in the `samples/doris-demo/` directory. See the `README.md` in each directory for details on how to use it.
Examples of rpc server implementations and cpp/java/python languages are provided in the `samples/doris-demo/` directory. See the `README.md` in each directory for details on how to use it.
For example, rpc_add_string
```
mysql >select rpc_add_string('doris');
+-------------------------+
| rpc_add_string('doris') |
+-------------------------+
| doris_rpc_test |
+-------------------------+
```
The logs will be displayed.

```
INFO: fnCall request=function_name: "add_string"
args {
type {
id: STRING
}
has_null: false
string_value: "doris"
}
INFO: fnCall res=result {
type {
id: STRING
}
has_null: false
string_value: "doris_rpc_test"
}
status {
status_code: 0
}
```
50 changes: 47 additions & 3 deletions docs/zh-CN/docs/ecosystem/udf/remote-user-defined-function.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,19 @@ PROPERTIES (["key"="value"][,...])

示例:
```sql
CREATE FUNCTION rpc_add(INT, INT) RETURNS INT PROPERTIES (
"SYMBOL"="add_int",
"OBJECT_FILE"="127.0.0.1:9090",
CREATE FUNCTION rpc_add_two(INT,INT) RETURNS INT PROPERTIES (
"SYMBOL"="add_int_two",
"OBJECT_FILE"="127.0.0.1:9114",
"TYPE"="RPC"
);
CREATE FUNCTION rpc_add_one(INT) RETURNS INT PROPERTIES (
"SYMBOL"="add_int_one",
"OBJECT_FILE"="127.0.0.1:9114",
"TYPE"="RPC"
);
CREATE FUNCTION rpc_add_string(varchar(30)) RETURNS varchar(30) PROPERTIES (
"SYMBOL"="add_string",
"OBJECT_FILE"="127.0.0.1:9114",
"TYPE"="RPC"
);
```
Expand All @@ -108,3 +118,37 @@ UDF 的使用与普通的函数方式一致,唯一的区别在于,内置函

## 示例
`samples/doris-demo/` 目录中提供和 cpp/java/python 语言的rpc server 实现示例。具体使用方法见每个目录下的`README.md`
例如rpc_add_string
```
mysql >select rpc_add_string('doris');
+-------------------------+
| rpc_add_string('doris') |
+-------------------------+
| doris_rpc_test |
+-------------------------+
```
日志会显示

```
INFO: fnCall request=function_name: "add_string"
args {
type {
id: STRING
}
has_null: false
string_value: "doris"
}
INFO: fnCall res=result {
type {
id: STRING
}
has_null: false
string_value: "doris_rpc_test"
}
status {
status_code: 0
}
```



Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public void fnCall(FunctionService.PFunctionCallRequest request,
// symbol is functionName
String functionName = request.getFunctionName();
logger.info("fnCall request=" + request);
FunctionService.PFunctionCallResponse res;
if ("add_int".equals(functionName)) {
FunctionService.PFunctionCallResponse res = null;
if ("add_int_two".equals(functionName)) {
res = FunctionService.PFunctionCallResponse.newBuilder()
.setStatus(Types.PStatus.newBuilder().setStatusCode(0).build())
.addResult(Types.PValues.newBuilder().setHasNull(false)
Expand All @@ -52,9 +52,26 @@ public void fnCall(FunctionService.PFunctionCallRequest request,
.getInt32Value(i)).collect(Collectors.toList()))
.setType(Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.INT32).build())
.build()).build();
} else {
}
else if ("add_int_one".equals(functionName)) {
res = FunctionService.PFunctionCallResponse.newBuilder()
.setStatus(Types.PStatus.newBuilder().setStatusCode(1).build()).build();
.setStatus(Types.PStatus.newBuilder().setStatusCode(0).build())
.addResult(Types.PValues.newBuilder().setHasNull(false)
.addAllInt32Value(IntStream.range(0, request.getArgs(0)
.getInt32ValueCount())
.mapToObj(i -> request.getArgs(0).getInt32Value(i) + 1).collect(Collectors.toList()))
.setType(Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.INT32).build())
.build()).build();
}
else if ("add_string".equals(functionName)) {
res = FunctionService.PFunctionCallResponse.newBuilder()
.setStatus(Types.PStatus.newBuilder().setStatusCode(0).build())
.addResult(Types.PValues.newBuilder().setHasNull(false)
.addAllStringValue(IntStream.range(0, request.getArgs(0)
.getStringValueCount())
.mapToObj(i -> request.getArgs(0).getStringValue(i) + "_rpc_test").collect(Collectors.toList()))
.setType(Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.STRING).build())
.build()).build();
}
logger.info("fnCall res=" + res);
ok(responseObserver, res);
Expand All @@ -66,12 +83,24 @@ public void checkFn(FunctionService.PCheckFunctionRequest request,
// symbol is functionName
logger.info("checkFn request=" + request);
int status = 0;
if ("add_int".equals(request.getFunction().getFunctionName())) {
if ("add_int_two".equals(request.getFunction().getFunctionName())) {
// check inputs count
if (request.getFunction().getInputsCount() != 2) {
status = -1;
}
}
if ("add_int_one".equals(request.getFunction().getFunctionName())) {
// check inputs count
if (request.getFunction().getInputsCount() != 1) {
status = -1;
}
}
if ("add_string".equals(request.getFunction().getFunctionName())) {
// check inputs count
if (request.getFunction().getInputsCount() != 1) {
status = -1;
}
}
FunctionService.PCheckFunctionResponse res =
FunctionService.PCheckFunctionResponse.newBuilder()
.setStatus(Types.PStatus.newBuilder().setStatusCode(status).build()).build();
Expand Down

0 comments on commit 4d66e97

Please sign in to comment.