Skip to content

Commit eb06f99

Browse files
author
Your Name
committed
控制台和节点间的通讯方式改为tcp通讯
1 parent 531e16e commit eb06f99

18 files changed

+1009
-0
lines changed

admin/cmd/cmd.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package cmd
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
7+
"github.com/eolinker/goku-api-gateway/config"
8+
)
9+
10+
var (
11+
ErrorInvalidNodeInstance = errors.New("invalid instance value")
12+
ErrorInvalidNodeConfig = errors.New("invalid instance config")
13+
)
14+
15+
type Code string
16+
17+
const (
18+
None Code = "none"
19+
NodeRegister Code = "register"
20+
NodeRegisterResult Code = "register-result"
21+
NodeLevel Code = "level"
22+
Config Code = "config"
23+
Restart Code = "restart"
24+
Stop Code = "stop"
25+
Monitor Code = "monitor"
26+
EventClientLeave Code = "leave"
27+
Error Code = "error"
28+
)
29+
30+
func EncodeConfig(c *config.GokuConfig) ([]byte, error) {
31+
if c == nil {
32+
return nil, ErrorInvalidNodeConfig
33+
}
34+
35+
return json.Marshal(c)
36+
}
37+
func DecodeConfig(data []byte) (*config.GokuConfig, error) {
38+
if len(data) == 0 {
39+
return nil, ErrorInvalidNodeConfig
40+
}
41+
c := new(config.GokuConfig)
42+
err := json.Unmarshal(data, c)
43+
if err != nil {
44+
return nil, err
45+
}
46+
return c, nil
47+
}

admin/cmd/conn.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net"
7+
"sync"
8+
)
9+
10+
var (
11+
ErrorSendToClosedConnect = errors.New("send to closed connect")
12+
)
13+
14+
type Connect struct {
15+
conn net.Conn
16+
17+
//inputC chan _Frame
18+
outputC chan []byte
19+
20+
doneC chan struct{}
21+
22+
ctx context.Context
23+
cancelFunc context.CancelFunc
24+
25+
once sync.Once
26+
}
27+
28+
func NewConnect(conn net.Conn) *Connect {
29+
ctx, cancel := context.WithCancel(context.Background())
30+
c := &Connect{
31+
conn: conn,
32+
33+
//inputC: make(chan _Frame,10),
34+
outputC: make(chan []byte, 10),
35+
36+
ctx: ctx,
37+
cancelFunc: cancel,
38+
}
39+
40+
go c.r()
41+
42+
return c
43+
}
44+
func (c *Connect) r() {
45+
46+
for {
47+
frame, err := ReadFrame(c.conn)
48+
if err != nil {
49+
break
50+
}
51+
c.outputC <- frame
52+
}
53+
close(c.outputC)
54+
}
55+
56+
func (c *Connect) Close() error {
57+
58+
c.once.Do(func() {
59+
c.cancelFunc()
60+
c.conn.Close()
61+
62+
})
63+
64+
return nil
65+
}
66+
67+
func (c *Connect) LocalAddr() net.Addr {
68+
return c.conn.LocalAddr()
69+
}
70+
71+
func (c *Connect) RemoteAddr() net.Addr {
72+
return c.conn.RemoteAddr()
73+
}
74+
75+
func (c *Connect) Send(code Code, data []byte) error {
76+
return SendFrame(c.conn, code, data)
77+
}
78+
79+
func (c *Connect) ReadC() <-chan []byte {
80+
return c.outputC
81+
}
82+
func (c *Connect) Done() <-chan struct{} {
83+
return c.ctx.Done()
84+
}

admin/cmd/error.go

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package cmd
2+
3+
type ErrorInfo struct {
4+
Error string `json:"error"`
5+
}
6+
7+
func DecodeError(data []byte) (string, error) {
8+
9+
return string(data), nil
10+
}
11+
func EncodeError(err string) ([]byte, error) {
12+
13+
return []byte(err), nil
14+
}

admin/cmd/register.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package cmd
2+
3+
import (
4+
"encoding/json"
5+
"github.com/eolinker/goku-api-gateway/config"
6+
)
7+
8+
type RegisterResult struct {
9+
Code int
10+
Error string
11+
Config *config.GokuConfig
12+
}
13+
14+
func DecodeRegisterResult(data []byte) (*RegisterResult, error) {
15+
r := new(RegisterResult)
16+
err := json.Unmarshal(data, r)
17+
if err != nil {
18+
return nil, err
19+
}
20+
return r, nil
21+
}
22+
func EncodeRegisterResultConfig(c *config.GokuConfig) ([]byte, error) {
23+
r := RegisterResult{
24+
Code: 0,
25+
Error: "",
26+
Config: c,
27+
}
28+
return json.Marshal(r)
29+
}
30+
func EncodeRegisterResultError(err string) ([]byte, error) {
31+
r := RegisterResult{
32+
Code: -1,
33+
Error: err,
34+
Config: nil,
35+
}
36+
return json.Marshal(r)
37+
}
38+
39+
func DecodeRegister(data []byte) (string, error) {
40+
if len(data) == 32 {
41+
return string(data), nil
42+
}
43+
return "", ErrorInvalidNodeInstance
44+
}
45+
46+
func EncodeRegister(nodeKey string) ([]byte, error) {
47+
data := []byte(nodeKey)
48+
if len(data) == 32 {
49+
return data, nil
50+
}
51+
return nil, ErrorInvalidNodeInstance
52+
}

admin/cmd/server.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package cmd
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"errors"
7+
"io"
8+
9+
"github.com/eolinker/goku-api-gateway/common/ioutils"
10+
)
11+
12+
var (
13+
ErrorEmptyFrame = errors.New("empty frame")
14+
ErrorInvalidCode = errors.New("invalid code")
15+
)
16+
17+
func ReadFrame(reader io.Reader) ([]byte, error) {
18+
19+
sizeBuf := make([]byte, 4, 4)
20+
// 获取报文头部信息
21+
_, err := io.ReadFull(reader, sizeBuf)
22+
if err != nil {
23+
return nil, err
24+
}
25+
// 获取报文数据大小
26+
size := binary.BigEndian.Uint32(sizeBuf)
27+
28+
data := make([]byte, size, size)
29+
30+
_, e := io.ReadFull(reader, data)
31+
if e != nil {
32+
return nil, err
33+
}
34+
return data, nil
35+
}
36+
func GetCmd(frame []byte) (Code, []byte, error) {
37+
frameLen := len(frame)
38+
if frameLen < 5 {
39+
// 长度小于5时,报文没有数据
40+
return "", nil, ErrorEmptyFrame
41+
}
42+
buf := bytes.NewBuffer(frame)
43+
codeData, n, err := ioutils.ReadLField(buf, nil)
44+
if err != nil {
45+
return "", nil, err
46+
}
47+
48+
return Code(codeData), frame[n:], nil
49+
}
50+
51+
func SendError(w io.Writer, err error) {
52+
if err == nil {
53+
return
54+
}
55+
data := []byte(err.Error())
56+
SendFrame(w, Error, data)
57+
}
58+
func SendFrame(w io.Writer, code Code, data []byte) error {
59+
codeData := []byte(code)
60+
61+
size := uint32(len(data) + len(codeData) + 1)
62+
sizeAll := size + 4
63+
buf := bytes.NewBuffer(make([]byte, sizeAll, sizeAll))
64+
buf.Reset()
65+
66+
err := binary.Write(buf, binary.BigEndian, size)
67+
if err != nil {
68+
return err
69+
}
70+
71+
_, err = ioutils.WriteLField(buf, codeData)
72+
if err != nil {
73+
return err
74+
}
75+
76+
if len(data) > 0 {
77+
_, err := buf.Write(data)
78+
if err != nil {
79+
return err
80+
}
81+
}
82+
//b:= buf.Bytes()
83+
//_,err =w.Write(b)
84+
_, err = buf.WriteTo(w)
85+
return err
86+
}

admin/console/callback.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package console
2+
3+
import (
4+
"github.com/eolinker/goku-api-gateway/admin/cmd"
5+
goku_log "github.com/eolinker/goku-api-gateway/goku-log"
6+
)
7+
8+
type Callback func(code cmd.Code, data []byte,client *Client) error
9+
10+
func (c Callback) ServerCode(code cmd.Code, data []byte, client *Client) error{
11+
return c(code,data,client)
12+
}
13+
14+
type CodeHandler interface {
15+
ServerCode(code cmd.Code, data []byte,client *Client)error
16+
}
17+
18+
//Register cmd 回调注册器
19+
type Register struct {
20+
callbacks map[cmd.Code][]CodeHandler
21+
}
22+
//NewRegister create register
23+
func NewRegister() *Register {
24+
return &Register{
25+
callbacks: make(map[cmd.Code][]CodeHandler),
26+
}
27+
}
28+
29+
//Register 注册回调
30+
func (s *Register) Register(code cmd.Code,handler CodeHandler){
31+
s.callbacks[code] = append(s.callbacks[code],handler)
32+
}
33+
//Register 注册回调
34+
func (s *Register) RegisterFunc(code cmd.Code,callback func(code cmd.Code, data []byte,client *Client) error){
35+
s.callbacks[code] = append(s.callbacks[code],Callback(callback))
36+
}
37+
38+
//Callback 调用回调
39+
func (s *Register)Callback(code cmd.Code,data []byte,client *Client)error {
40+
m:=s.callbacks
41+
callbacks,has:= m[code]
42+
if !has{
43+
goku_log.Info("not exists call for ",code)
44+
return nil
45+
}
46+
for _,handler:=range callbacks{
47+
if e:=handler.ServerCode(code,data,client);e!=nil{
48+
return e
49+
}
50+
}
51+
return nil
52+
}

0 commit comments

Comments
 (0)