Skip to content

Commit 7137f45

Browse files
authored
Merge pull request #19 from oldduckruirui/main
add demo for ZMTP
2 parents 1426f3c + 2d290e1 commit 7137f45

File tree

8 files changed

+313
-4
lines changed

8 files changed

+313
-4
lines changed

example/zmtp/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/pb

example/zmtp/demo.proto

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
syntax = "proto3";
2+
3+
package pb;
4+
5+
option go_package = ".;pb";
6+
7+
service Game {
8+
rpc Game(OrderRequest) returns (OrderResponse);
9+
}
10+
11+
message OrderRequest {
12+
string business_id = 1235;
13+
}
14+
15+
message OrderResponse {
16+
string msg = 1235;
17+
}
18+
19+
message MessageWrapper {
20+
oneof message {
21+
OrderRequest request = 1;
22+
OrderResponse response = 2;
23+
}
24+
}

example/zmtp/wasm.wasm

706 KB
Binary file not shown.

example/zmtp/zmtp.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/deepflowio/deepflow-wasm-go-sdk/example/zmtp/pb"
7+
"github.com/deepflowio/deepflow-wasm-go-sdk/sdk"
8+
sdkpb "github.com/deepflowio/deepflow-wasm-go-sdk/sdk/pb"
9+
)
10+
11+
//go:generate mkdir -p pb
12+
//go:generate protoc --go_out=./pb --go-vtproto_out=./pb --go-vtproto_opt=features=unmarshal ./demo.proto
13+
func main() {
14+
sdk.Info("zmtp-plugin loaded")
15+
parser := ZrpcParser{}
16+
parser.Parser = interface{}(parser).(sdk.Parser)
17+
sdk.SetParser(parser)
18+
}
19+
20+
type ZrpcParser struct {
21+
sdk.DefaultParser
22+
}
23+
24+
func (p ZrpcParser) HookIn() []sdk.HookBitmap {
25+
return []sdk.HookBitmap{
26+
sdk.HOOK_POINT_CUSTOM_MESSAGE,
27+
}
28+
}
29+
30+
func (p ZrpcParser) CustomMessageHookIn() uint64 {
31+
return sdk.CustomMessageHookProtocol(sdk.PROTOCOL_ZMTP, true)
32+
}
33+
34+
func (p ZrpcParser) onZMTPMessage(payload []byte) sdk.Action {
35+
var zmtpMsg sdkpb.ZmtpMessage
36+
if err := zmtpMsg.UnmarshalVT(payload); err != nil {
37+
return sdk.ActionNext()
38+
}
39+
var msgWrapper pb.MessageWrapper
40+
if err := msgWrapper.UnmarshalVT(zmtpMsg.Payload); err != nil {
41+
return sdk.ActionNext()
42+
}
43+
jsonData, err := json.Marshal(&msgWrapper)
44+
if err != nil {
45+
jsonData = nil
46+
}
47+
jsonStr := string(jsonData)
48+
return sdk.CustomMessageActionAbortWithResult([]sdk.KeyVal{
49+
{
50+
Key: "json_payload",
51+
Val: jsonStr,
52+
},
53+
})
54+
}
55+
56+
func (p ZrpcParser) OnCustomMessage(ctx *sdk.CustomMessageCtx) sdk.Action {
57+
if ctx.HookPoint == sdk.ProtocolParse && ctx.TypeCode == uint32(sdk.PROTOCOL_ZMTP) {
58+
return p.onZMTPMessage(ctx.Payload)
59+
} else {
60+
return sdk.ActionNext()
61+
}
62+
}

sdk/WasmPluginApi.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,10 @@ message NatsMessage {
88
string reply_to = 2;
99
bytes payload = 3;
1010
}
11+
12+
message ZmtpMessage {
13+
oneof subscription {
14+
string match_pattern = 1;
15+
}
16+
bytes payload = 2;
17+
}

sdk/parser.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ var (
5454

5555
var (
5656
PROTOCOL_NATS uint16 = 104
57+
PROTOCOL_ZMTP uint16 = 106
5758
)
5859

5960
var CUSTOM_MESSAGE_HOOK_ALL uint64 = 0xff << 48

sdk/pb/WasmPluginApi.pb.go

Lines changed: 101 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/pb/WasmPluginApi_vtproto.pb.go

Lines changed: 117 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)